How to write streaming Dataset to Cassandra?
Asked Answered
M

2

9

So I have a Python Stream-sourced DataFrame df that has all the data I want to place into a Cassandra table with the spark-cassandra-connector. I've tried doing this in two ways:

df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .save() 

query = df.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .outputMode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .start()

query.awaitTermination()

However I keep on getting this errors, respectively:

pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;

and

java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing.

Is there anyway I can send my Streamed DataFrame into a my Cassandra Table?

Mitman answered 15/7, 2017 at 1:16 Comment(0)
G
6

There is currently no streaming Sink for Cassandra in the Spark Cassandra Connector. You will need to implement your own Sink or wait for it to become available.

If you were using Scala or Java you could use foreach operator and use a ForeachWriter as described in Using Foreach.

Guiscard answered 15/7, 2017 at 2:21 Comment(8)
Is there any way I can convert my Streaming DataFrame into a non-Streaming data frame?Mitman
No, there is no conversion (at least none that I know of)Guiscard
Do you have a working example in Java? Looks all solutions come to CassandraConnector.withSessionDo which needs Scala implemented trait; so no luck with Kotlin or Java..Buyer
is this still true today (I mean 2018)?Briseno
In Spark 2.3 and beyond, when creating a custom sink, it looks like Spark does not allow you to call .write() on your dataframe in addbatch method. It throws the error that OP shared. Does anybody know what the alternative here is?Asarum
I haven't been working with Spark for a while now. Although I did manage to complete my small project, I'm not sure if this will help but you can see how I got past this. github.com/dretta/StockStats/blob/master/src/main/scala/com/… github.com/dretta/StockStats/blob/master/Prototype/… The code doesn't actually work since the api I was using was modified and the call I'm making to it is deprecated.Mitman
Is this true as of Oct 2018?Argentinaargentine
It is only supported in DSE 6.0 which uses an enhanced version of the OSS codeGuiscard
C
5

I know its an old post, updating it for future references.

You can process it as a batch from streaming data. like below

def writeToCassandra(writeDF, epochId):
 writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="table_name", keyspace="keyspacename")\
    .mode("append") \
    .save()

query = sdf3.writeStream \
.trigger(processingTime="10 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
Capitol answered 5/11, 2019 at 22:38 Comment(1)
It's giving me error i.e. Failed to find data source: org.apache.spark.sql.cassandra. I'm passing the package using --packages option.Typical

© 2022 - 2024 — McMap. All rights reserved.