How to perform Unit testing on Spark Structured Streaming?
Asked Answered
F

2

6

I would like to know about the unit testing side of Spark Structured Streaming. My scenario is, I am getting data from Kafka and I am consuming it using Spark Structured Streaming and applying some transformations on top of the data.

I am not sure about how can I test this using Scala and Spark. Can someone tell me how to do unit testing in Structured Streaming using Scala. I am new to streaming.

Fillmore answered 4/7, 2019 at 20:47 Comment(0)
E
12

tl;dr Use MemoryStream to add events and memory sink for the output.

The following code should help to get started:

import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")

// use sessions event stream to apply required transformations
val transformedSessions = ...

val streamingQuery = transformedSessions
  .writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .outputMode(queryOutputMode)
  .start

// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
  eventGen.generate(userId = 1, offset = 1.second),
  eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])

// check the output
// The output is in queryName table
// The following code simply shows the result
spark
  .table(queryName)
  .show(truncate = false)
Eisenhower answered 5/7, 2019 at 22:25 Comment(1)
Hi @Jacek Laskowski , the memory stream seems to keep on running, it wont stop even on streamQuery.stop() . Could you take a sneak into this also : #65987282Archaism
I
1

So, I tried to implement the answer from @Jacek and I couldn't find how to create the eventGen object and also test a small streaming application for write data on the console. I am also using MemoryStream and here I show a small example working.

The class that I testing is:

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession, functions}

object StreamingDataFrames {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(StreamingDataFrames.getClass.getSimpleName)
      .master("local[2]")
      .getOrCreate()

    val lines = readData(spark, "socket")
    val streamingQuery = writeData(lines)
    streamingQuery.awaitTermination()
  }

  def readData(spark: SparkSession, source: String = "socket"): DataFrame = {
    val lines: DataFrame = spark.readStream
      .format(source)
      .option("host", "localhost")
      .option("port", 12345)
      .load()
    lines
  }

  def writeData(df: DataFrame, sink: String = "console", queryName: String = "calleventaggs", outputMode: String = "append"): StreamingQuery = {

    println(s"Is this a streaming data frame: ${df.isStreaming}")

    val shortLines: DataFrame = df.filter(functions.length(col("value")) >= 3)

    val query = shortLines.writeStream
      .format(sink)
      .queryName(queryName)
      .outputMode(outputMode)
      .start()
    query
  }
}

I test only the writeData method. This is way I split the query into 2 methods. Then here is the Spec to test the class. I use a SharedSparkSession class to facilitate the open and close of spark context. Like it is shown here.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming.{LongOffset, MemoryStream}
import org.github.explore.spark.SharedSparkSession
import org.scalatest.funsuite.AnyFunSuite

class StreamingDataFramesSpec extends AnyFunSuite with SharedSparkSession {

  test("spark structured streaming can read from memory socket") {

    // We can import sql implicits
    implicit val sqlCtx = sparkSession.sqlContext

    import sqlImplicits._

    val events = MemoryStream[String]
    val queryName: String = "calleventaggs"

    // Add events to MemoryStream as if they came from Kafka
    val batch = Seq(
      "this is a value to read",
      "and this is another value"
    )
    val currentOffset = events.addData(batch)

    val streamingQuery = StreamingDataFrames.writeData(events.toDF(), "memory", queryName)

    streamingQuery.processAllAvailable()
    events.commit(currentOffset.asInstanceOf[LongOffset])

    val result: DataFrame = sparkSession.table(queryName)
    result.show

    streamingQuery.awaitTermination(1000L)
    assertResult(batch.size)(result.count)

    val values = result.take(2)
    assertResult(batch(0))(values(0).getString(0))
    assertResult(batch(1))(values(1).getString(0))
  }
}
Ianthe answered 5/3, 2021 at 16:34 Comment(2)
github.com/holdenk/spark-testing-base should have support for strucutred streamingShelton
Yes. I agree. I was using it but it was so simple to implement my own interface, like here (#39151689), that I decided to remove this lib.Ianthe

© 2022 - 2024 — McMap. All rights reserved.