How to pushdown limit predicate for Cassandra when you use dataframes?
Asked Answered
C

1

8

I have large Cassandra table. I want to load only 50 rows from Cassandra. Following code

val ds = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> s"$Aggregates", "keyspace" -> s"$KeySpace"))
      .load()
      .where(col("aggregate_type") === "DAY")
      .where(col("start_time") <= "2018-03-28")
      .limit(50).collect()

Following code pushes both predicates from where methods, but not limit one. Is it true that whole data (1 million of records) being fetched? If not, why run time of this code and code without limit(50) roughly the same.

Cloistral answered 28/3, 2018 at 12:28 Comment(3)
Just a guess: if you actually have less than 50 records meeting the predicate - Spark has to go through the whole table in order to check if there is moreThilda
@Thilda no, it's not the case, more than 10k records meets the predicateCloistral
limit doesn't translate to CQL's limit by default. However, if you work with underlying RDD, you can asInstanceOf[CassandraRDD] where specially defined limit method is available for CQL. See the edited answerThilda
T
6

Unlike Spark Streaming, Spark itself is trying to preload as much data as it can, as fast as it can so to be able operate on it in parallel. So preloading is lazy, but greedy when it's triggered. There are cassandra-conector specific factors however:

  • Automatic predicate pushdown of valid "where" clauses.

  • According to this answer limit(...) is not translated to CQL's LIMIT, so then its behavior depends on how many fetching jobs are created after enough data is downloaded. Quote:

calling limit will allow Spark to skip reading some portions from the underlying DataSource. These would limit the amount of data read from Cassandra by canceling tasks from being executed.

Possible solutions:

  • DataFrame limits could be partially managed by limiting numPartitions and data exchange rate (concurrent.reads and other params). If you're okay with n ~ 50 "in most cases", you could also limit something like where(dayIndex < 50 * factor * num_records).

  • There is a way to set CQL LIMIT through SparkPartitionLimit, which is directly affecting every CQL request (see more) - keep in mind that requests are per-spark-partition. It's available in CassandraRdd extension class, so you would have to convert to RDD first.

The code would be something like:

filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()

This would append LIMIT $N to every CQL-request. Unlike with DataFrame's limit, if you specify CassandraRDD limit several times (.limit(10).limit(20)) - only last one is going to be appended. Also, I used n instead of n / numPartitions + 1 as it (even if Spark and Cassandra partitions are one-to-one) might return less results per-partition. As a result, I had to add take(n) in order to cut <= numPartitions * n down to n.

Warning double-check that your where's are translatable to CQL (using explain()) - otherwise LIMIT would be applied before filtering.

P.S. You could also try to run CQL directly using sparkSession.sql(...) (like here) and compare results.

Thilda answered 1/4, 2018 at 20:37 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.