Spark 3.x Integration with Kafka in Python
F

1

5

Kafka with spark-streaming throws an error:

from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka

I have already setup a kafka broker and a working spark environment with one master and one worker.

import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils



if __name__=="__main__":
    sc = SparkContext(appName="SparkStreamAISfromKAFKA")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,1)
    kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
    lines = kvs.map(lambda x: x[1])
    lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
    ssc.start()
    ssc.awaitTermination()

I assume for the error that something is missing related to kafka ans specifically with the versions. Can anyone help with this?

spark-version: version 3.0.0-preview2

I execute with:

/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars spark-streaming-kafka-0-10_2.11 spark_streamer.py spark://mysparkip:7077
Flytrap answered 19/5, 2020 at 13:4 Comment(0)
M
11

According to the Spark Streaming + Kafka Integration Guide:

"Kafka 0.8 support is deprecated as of Spark 2.3.0."

In addition, the screenshot below shows that Python is no supported for Kafka 0.10 (and higher).

enter image description here

In your case you will have to use Spark 2.4 in order to get your code running.

PySpark supports Structured Streaming

If you plan to use the latest version of Spark (e.g. 3.x) and still want to integrate Spark with Kafka in Python you can use Structured Streaming. You will find detailed instructions on how to use the Python API in the Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher):

Reading Data from Kafka

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Writing Data to Kafka

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()
Marchetti answered 23/5, 2020 at 21:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.