Unlike Spark Streaming, Spark itself is trying to preload as much data as it can, as fast as it can so to be able operate on it in parallel. So preloading is lazy, but greedy when it's triggered. There are cassandra-conector specific factors however:
Automatic predicate pushdown of valid "where" clauses.
According to this answer limit(...)
is not translated to CQL's LIMIT
, so then its behavior depends on how many fetching jobs are created after enough data is downloaded. Quote:
calling limit will allow Spark to skip reading some portions from the
underlying DataSource. These would limit the amount of data read from
Cassandra by canceling tasks from being executed.
Possible solutions:
DataFrame limits could be partially managed by limiting numPartitions
and data exchange rate (concurrent.reads
and other params). If you're okay with n ~ 50 "in most cases", you could also limit something like where(dayIndex < 50 * factor * num_records)
.
There is a way to set CQL LIMIT
through SparkPartitionLimit
, which is directly affecting every CQL request (see more) - keep in mind that requests are per-spark-partition. It's available in CassandraRdd extension class, so you would have to convert to RDD first.
The code would be something like:
filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()
This would append LIMIT $N
to every CQL-request. Unlike with DataFrame
's limit, if you specify CassandraRDD limit
several times (.limit(10).limit(20)
) - only last one is going to be appended. Also, I used n
instead of n / numPartitions + 1
as it (even if Spark and Cassandra partitions are one-to-one) might return less results per-partition. As a result, I had to add take(n)
in order to cut <= numPartitions * n
down to n
.
Warning double-check that your where
's are translatable to CQL (using explain()
) - otherwise LIMIT
would be applied before filtering.
P.S. You could also try to run CQL directly using sparkSession.sql(...)
(like here) and compare results.
limit
doesn't translate to CQL's limit by default. However, if you work with underlying RDD, you canasInstanceOf[CassandraRDD]
where specially definedlimit
method is available for CQL. See the edited answer – Thilda