Connecting Pyspark with Kafka
K

1

6

I'm having problem understanding how to connect Kafka and PySpark.

I have kafka installation on Windows 10 with topic nicely streaming data. I've installed pyspark which runs properly-I'm able to create test DataFrame without problem.

But when I try to connect to Kafka stream it gives me error:

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming- Kafka Integration Guide".

Spark documentation is not really helpful - it says: ... groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...

For Python applications, you need to add this above library and its dependencies when deploying your application. See the Deploying subsection below.

And then when you go to Deploying section it says:

As with any Spark applications, spark-submit is used to launch your application. spark-sql-kafka-0-10_2.12 and its dependencies can be directly added to spark-submit using --packages, such as, ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ...

I'm developing app, I don't want to deploy it. Where and how to add these dependencies if I'm developing pyspark app?

Tried several tutorials ended up being more confused.

Saw answer saying that

"You need to add kafka-clients JAR to your --packages".so-answer

Few more steps could be useful because for someone who is new this is unclear.

versions:

  • kafka 2.13-2.8.1
  • spark 3.1.2
  • java 11.0.12

All environmental variables and paths are correctly set.

EDIT

I've load :

   os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1'

as suggested but still getting same error. I've triple checked kafka, scala and spark versions and tried various combinations but not it didn't work, I'm still getting same error:

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming-Kafka Integration Guide".

EDIT 2

I installed latest Spark 3.2.0 and Hadoop 3.3.1 and kafka version kafka_2.12-2.8.1. Changed all environmental variables, tested Spark and Kafka - working properly.

My environment variable looks like this now:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.kafka:kafka-clients:2.8.1'

Still no luck, I get same error :(

Kilimanjaro answered 16/12, 2021 at 6:12 Comment(0)
C
10

Spark documentation is not really helpful - it says ... artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...

Yes, that is correct... but you're looking at documentation for the latest version of Spark

Instead, you've mentioned

versions:

  • spark 3.1.2

Have you tried looking at the version specific docs?

In other words, you want the matching spark-sql-kafka version of 3.1.2.

bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

Or in Python,

scala_version = '2.12'
spark_version = '3.1.2'
# TODO: Ensure match above values match the correct versions
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.1'
]
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()

Or with an env-var

import os

spark_version = '3.1.2'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)

# init spark here

need to add this above library and its dependencies

As you found in my previous answer, also append the kafka-clients package using comma-separated list.

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1


I'm developing app, I don't want to deploy it.

"Deploy" is Spark terminology. Running locally is still a "deployment"

Consecration answered 16/12, 2021 at 15:55 Comment(9)
Thank you so much for this! One thing is still not clear to me - how do I 'append the kafka-clients package' ? Where is that package?Kilimanjaro
Also spark.apache.org/docs/latest/… says: "Do not manually add dependencies on org.apache.kafka artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has the appropriate transitive dependencies already, and different versions may be incompatible in hard to diagnose ways."Kilimanjaro
Last I checked, PySpark doesn't pull the transitive dependencies (thus, the reported error in the answer you linked), only when using a tool like SBT / Maven / Gradle, it does. As for "where" - Maven Central. You "append" a string to the packages argument, as shownConsecration
I've tried as you suggested - see edit on my question. It's still not working :(Kilimanjaro
You put the {} formatter at the wrong spot. It only goes to the Spark dependencies, not at the very endConsecration
I've edited answer - not working again. Problem is that my Kafka is for Scala version 2.13 and Spark requires 2.12. I'll reinstall kafka and try again...Kilimanjaro
I'm using latest Spark 3.2.0 and kafka_2.12-2.8.1 but still no luck.Kilimanjaro
The scala api version for the Kafka broker shouldn't matter. As long as your Spark version and Spark's scala version are the same as the packages argument, then it'll work. Where are you putting os.environ call in relation to the Spark code? Are you importing findspark, perhaps?Consecration
@user12 It's correct, and been answered other times tooConsecration

© 2022 - 2024 — McMap. All rights reserved.