To mock Kafka under Python unit tests with SBT test tasks I did as below. Pyspark should be installed.
in build.sbt define task that should be run with tests:
val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")
val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")
testPythonTask := {
val s: TaskStreams = streams.value
s.log.info("Executing task testPython")
Process(command,
workingDirectory,
// arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
"PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
// collect all jar paths from project
.format((fullClasspath in Runtime value)
.map(_.data.getCanonicalPath)
.filter(_.contains(".jar"))
.mkString(",")),
"PYSPARK_PYTHON" -> "python3") ! s.log
}
//attach custom test task to default test tasks
test in Test := {
testPythonTask.value
(test in Test).value
}
testOnly in Test := {
testPythonTask.value
(testOnly in Test).value
}
in python testcase (app_test.py):
import random
import unittest
from itertools import chain
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase
class KafkaStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds
duration = 1
def setUp(self):
super(KafkaStreamTests, self).setUp()
kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
self._kafkaTestUtils.setup()
def tearDown(self):
if self._kafkaTestUtils is not None:
self._kafkaTestUtils.teardown()
self._kafkaTestUtils = None
super(KafkaStreamTests, self).tearDown()
def _randomTopic(self):
return "topic-%d" % random.randint(0, 10000)
def _validateStreamResult(self, sendData, stream):
result = {}
for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
sum(sendData.values()))):
result[i] = result.get(i, 0) + 1
self.assertEqual(sendData, result)
def test_kafka_stream(self):
"""Test the Python Kafka stream API."""
topic = self._randomTopic()
sendData = {"a": 3, "b": 5, "c": 10}
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
{"auto.offset.reset": "smallest"})
self._validateStreamResult(sendData, stream)
More examples for Flume, Kinesis and other in pyspark.streaming.tests
module.