I had the same issue and found a solution by creating a subclass of InputDStream class. It is necessary to define start()
and compute()
can be used for preparation. The main logic resides in compute()
. It shall return Option[RDD[T]]
To make the class flexible, InputStreamQuery
trait is defined.
trait InputStreamQuery[T] {
// where clause condition for partition key
def partitionCond : (String, Any)
// function to return next partition key
def nextValue(v:Any) : Option[Any]
// where clause condition for clustering key
def whereCond : (String, (T) => Any)
// batch size
def batchSize : Int
For the Cassandra table keyspace.test
, create test_by_date
which reorganizes the table by the partitioning key date
CREATE TABLE IF NOT exists keyspace.test
(id timeuuid, date text, value text, primary key (id))
CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS
FROM keyspace.test
PRIMARY KEY (date, id)
One possible implementation for test
table shall be
class class Test(id:UUID, date:String, value:String)
trait InputStreamQueryTest extends InputStreamQuery[Test] {
val dateFormat = "uuuu-MM-dd"
// set batch size as 10 records
override def batchSize: Int = 10
// partitioning key conditions, query string and initial value
override def partitionCond: (String, Any) = ("date = ?", "2017-10-01")
// clustering key condition, query string and function to get clustering key from the instance
override def whereCond: (String, Test => Any) = (" id > ?", m => m.id)
// return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01'
override def nextValue(v: Any): Option[Any] = {
import java.time.format.DateTimeFormatter
val formatter = DateTimeFormatter.ofPattern( dateFormat)
val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1)
if ( nextDate.isAfter( LocalDate.now()) ) None
else Some( nextDate.format(formatter))
It can be used in the CassandraInputStream
class as follows.
class CassandraInputStream[T: ClassTag]
(_ssc: StreamingContext, keyspace:String, table:String)
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T])
extends InputDStream[T](_ssc) with InputStreamQuery[T] {
var lastElm:Option[T] = None
var partitionKey : Any = _
override def start(): Unit = {
// find a partition key which stores some records
def findStartValue(cql : String, value:Any): Any = {
val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1)
if (rdd.cassandraCount() > 0 ) value
else {
nextValue(value).map( findStartValue( cql, _)).getOrElse( value)
// get query string and initial value from partitionCond method
val (cql, value) = partitionCond
partitionKey = findStartValue(cql, value)
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[T]] = {
val (cql, _) = partitionCond
val (wh, whKey) = whereCond
def fetchNext( patKey: Any) : Option[CassandraTableScanRDD[T]] = {
// query with partitioning condition
val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where( cql, patKey)
val rdd = lastElm.map{ x =>
query.where( wh, whKey(x)).withAscOrder.limit(batchSize)
}.getOrElse( query.withAscOrder.limit(batchSize))
if ( rdd.cassandraCount() > 0 ) {
// store the last element of this RDD
lastElm = Some(rdd.collect.last)
else {
// find the next partition key which stores data
nextValue(patKey).flatMap{ k =>
partitionKey = k
fetchNext( partitionKey)
Combining all the classes,
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest
dstream.map(println).saveToCassandra( ... )