UPDATE Cassandra table using spark cassandra connector
Asked Answered
I

2

6

I'm facing an issue with spark cassandra connector on scala while updating a table in my keyspace

Here is my piece of code

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
                        " SET a= a + " + b + " WHERE x=" +
                        x + " AND y=" + y +
                        " AND z=" + x

println(query)

val KeySpace    = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)

hourUniqueKeySpace.sql(query)

When I execute this code, I'm getting an error like this

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found

Any idea why this is happening? How can I fix this?

Issi answered 5/8, 2015 at 23:13 Comment(5)
What is the result if you run the SQL statement that is generated by your code directly on Cassandra?Principalities
@kerkero : If I run it on cassandra, it will either update the row if the key is already present, or will create a new row for that key if the key is not presentIssi
Did you define the column which corresponds to "a" in your example as counter type?Principalities
If is defined, BTW... its not counter, its a setIssi
Hi @SunilKumarBM, in an arguably biased view I'd recommend using phantom for a normal Cassandra application, the spark connector is specifically geared towards Spark applications, whereas phantom is meant to be the foundation of any Cassandra based API.Harragan
F
6

The UPDATE of a table with counter column is feasible via the spark-cassandra-connector. You will have to use DataFrames and DataFrameWriter method save with mode "append" (or SaveMode.Append if you prefer). Check the code DataFrameWriter.scala.

For example, given a table:

cqlsh:test> SELECT * FROM name_counter ;

 name    | surname | count
---------+---------+-------
    John |   Smith |   100
   Zhang |     Wei |  1000
 Angelos |   Papas |    10

The code should look like this:

val updateRdd = sc.parallelize(Seq(Row("John",    "Smith", 1L),
                                   Row("Zhang",   "Wei",   2L),
                                   Row("Angelos", "Papas", 3L)))

val tblStruct = new StructType(
    Array(StructField("name",    StringType, nullable = false),
          StructField("surname", StringType, nullable = false),
          StructField("count",   LongType,   nullable = false)))

val updateDf  = sqlContext.createDataFrame(updateRdd, tblStruct)

updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test", "table" -> "name_counter"))
.mode("append")
.save()

After UPDATE:

 name    | surname | count
---------+---------+-------
    John |   Smith |   101
   Zhang |     Wei |  1002
 Angelos |   Papas |    13

The DataFrame conversion can be simpler by implicitly convert an RDD to a DataFrame: import sqlContext.implicits._ and using .toDF().

Check the full code for this toy application: https://github.com/kyrsideris/SparkUpdateCassandra/tree/master

Since versions are very important here, the above apply to Scala 2.11.7, Spark 1.5.1, spark-cassandra-connector 1.5.0-RC1-s_2.11, Cassandra 3.0.5. DataFrameWriter is designated as @Experimental since @since 1.4.0.

Flyover answered 21/4, 2016 at 11:29 Comment(1)
how can i insert new record or delete using dataframe?Dedal
R
4

I believe that you cannot update natively through the SPARK connector. See the documention:

"The default behavior of the Spark Cassandra Connector is to overwrite collections when inserted into a cassandra table. To override this behavior you can specify a custom mapper with instructions on how you would like the collection to be treated."

So you'll want to actually INSERT a new record with an existing key.

Rento answered 6/8, 2015 at 1:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.