Spark JoinWithCassandraTable on TimeStamp partition key STUCK
Asked Answered
S

1

6

I'm trying to filter on a small part of a huge C* table by using:

    val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tspark")

    println("Done Join")
    //*******
    //get only the snapshots and create rdd temp table
    val jsons = snapshotsFiltered.map(_._2.getString("snapshot"))
    val jsonSchemaRDD = sqlContext.jsonRDD(jsons)
    jsonSchemaRDD.registerTempTable("snapshots_json")

With:

    case class TableKey(created: Long) //(created, imei, when)--> created = partititon key | imei, when = clustering key

And the cassandra table schema is:

CREATE TABLE listener.snapshots_tspark (
created timestamp,
imei text,
when timestamp,
snapshot text,
PRIMARY KEY (created, imei, when) ) WITH CLUSTERING ORDER BY (imei ASC, when ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

The problem is that the process freezes after the println done with no errors on spark master ui.

[Stage 0:>                                                                                                                                (0 + 2) / 2]

Won`t the Join work with timestamp as the partition key? Why it freezes?

Sikata answered 25/10, 2015 at 12:8 Comment(9)
Did you check if there is enough resource for the job to run?Stephniestepladder
@Stephniestepladder Yes. Memory: 5.5 GB Total, 512.0 MB UsedSikata
If the collection snapshotsFiltered returns empty the next stage will stuck?Sikata
No, that's not the reason. It may get stuck from lack of resource mainly. Here it might be from the complexity of the query plan you are tyring to execute.Stephniestepladder
@Stephniestepladder why is it complex? It only suppose to cluster on the created timestamp and bring the ones that created > startDate and startDate to endDate < endDate. The whole point in using JoinWithCassandraTable (and not sc.cassandraTable("listener","snapshots_test_c").where("created >= "+ startDate + " and created <= "+ endDate) ) is to save time.Sikata
When I using where (or even worse - filter) it takes 10 houres because the spark loades the whole table to the memory... I need to push the job back to cassandra and that is why I want to use the JoinWithCassandraTable. As described here datastax.com/dev/blog/zen-art-spark-maintenanceSikata
Out of curiosity, did you try to break the steps of your table join?Stephniestepladder
@Stephniestepladder Yes. It went fine. It freezes after the Join done.Sikata
Let us continue this discussion in chat.Stephniestepladder
S
2

By using:

sc.parallelize(startDate to endDate)

With the startData and endDate as Longs generated from Dates by the format:

("yyyy-MM-dd HH:mm:ss")

I made spark to build a huge array (100,000+ objects) to join with C* table and it didn't stuck at all- C* worked hard to make the join happen and return the data.

Finally, I changed my range to:

case class TableKey(created_dh: String)
val data = Array("2015-10-29 12:00:00", "2015-10-29 13:00:00", "2015-10-29 14:00:00", "2015-10-29 15:00:00")
val snapshotsFiltered = sc.parallelize(data, 2).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tnew")

And it is ok now.

Sikata answered 29/10, 2015 at 19:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.