Inserting Data Into Cassandra table Using Spark DataFrame
Asked Answered
N

2

6

I'm using Scala Version 2.10.5 Cassandra 3.0 and Spark 1.6. I want to insert data into cassandra so I tried Out basic Example

scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

Which Works and able insert data into Cassandra.So I had a csv file Which I wan to insert into Cassandra table by matching schema

val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema =  StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
 personSchemaRDD.saveToCassandra

When I am using SaveToCassndra Iam getting saveToCassandra is not part of personSchemaRDD. So taught of trying in different way

 df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()

But getting the cannot connect to cassandra on ip:port.can any one tell me the best way to do it. I need to periodically save data to cassandra from the files.

Numen answered 20/12, 2016 at 17:42 Comment(0)
B
10

sqlContext.applySchema(...) returns a DataFrame and a DataFrame does not have the saveToCassandra method.

You could the .write method with it:

val personDF = sqlContext.applySchema(rowRDD, schema)
personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()

If we want to use the savetoCassandra method, the best way is to have a schema-aware RDD, using a case class.

case class Person(firstname:String, lastName:String, age:Int)
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
rowRDD.saveToCassandra(keyspace, table)

The Dataframe write method should work. Check that you have configured your context correctly.

Belcher answered 20/12, 2016 at 19:13 Comment(6)
How can I convert one of the element in Row() to `val rowRDD = input.map(_.split(",")).map(p => Row( p(0),getTimestamp((1)), p(2))) I want to YYYY-MM-DD' 'HH:mm:ss formatNumen
@Numen you would be better off by mapping timestamps to java.util.Date of jodatime.DateTime to avoid format issues.Belcher
When Using com.databricks.spark.csv is there any option to take "NA" my error Caused by: java.text.ParseException: Unparseable number: "NA"Numen
@Numen Don't drag questions in the comments. If you have a new issue, consider asking a new question.Belcher
It worked it's a connection issue changed in cassandta.yaml subnet_addressNumen
Where are you connecting to Cassandra ? i have df from oracle , now i have to save to cassandra , how should i get the connection , using which i need to savePhobia
P
0

I am putting my code here to save Spark dataset into Cassandra table using Spark Java.

private static void readBigEmptable(SparkSession sparkSession) {
   String cassandraEmpColumns= "id,name,salary,age,city";
    Dataset<Row> bigDataset = sparkSession.sql("select * from big_emp");
    // Generate the schema for output row
    List<StructField> fields = new ArrayList<>();
    for (String fieldName : cassandraEmpColumns.split(",")) {
        StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
        fields.add(field);
    }
    StructType schemaStructure = DataTypes.createStructType(fields);
    // Converting big dataset to RDD to perform operation on Row field
    JavaRDD<Row> bigRDD = bigDataset.toJavaRDD();
    JavaRDD<Row> resultRDD = bigRDD .map(new Function<Row, Row>() {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public Row call(Row row) throws Exception {
            // return compareField(row).iterator();
            Row outputRow = RowFactory.create(row.getAs("id"), row.getAs("name"), row.getAs("salary"),
                    row.getAs("age"), row.getAs("city"));
            return outputRow;
        }
    });
    Dataset<Row> empDs = sparkSession.createDataFrame(resultRDD, schemaStructure);
    empDs.show();
    writeToCassandraTable(empDs);

}

private static void writeToCassandraTable(Dataset<Row> dataset) {
    Map<String, String> tableProperties = new HashMap();
    tableProperties.put("keyspace", "test_keyspace");
    tableProperties.put("table", "emp_test");
    tableProperties.put("confirm.truncate", "true");
    dataset.write().format("org.apache.spark.sql.cassandra").options(tableProperties).mode(SaveMode.Overwrite)
            .save();
}

Note: If we are using mode(SaveMode.Overwrite) then we should use tableProperties.put("confirm.truncate", "true"); otherwise we will get error message.

SaveMode.Append

  • Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

SaveMode.ErrorIfExists

  • ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

SaveMode.Ignore

  • Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.

SaveMode.Overwrite

  • Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
Petaloid answered 10/7, 2018 at 17:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.