PySpark - NoClassDefFoundError: kafka/common/TopicAndPartition
Asked Answered
C

2

0

I'm running Spark version 2.3.0.2.6.5.1175-1 with Python 3. 6.8 on Ambari. While submitting the application I get the following logs in stderr

22/06/15 12:29:31 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint Exception in thread "Thread-10" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.privateGetPublicMethods(Class.java:2902) at java.lang.Class.getMethods(Class.java:1615) at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 12 more 22/06/15 12:29:33 ERROR ApplicationMaster: User application exited with status 1

Following are the stdout logs

    ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1062, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 908, in send_command
    response = connection.send_command(command)
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1067, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "read_stream.py", line 13, in <module>
    stream.start(callback=callback)
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 125, in start
    output = self.readStream().foreachRDD(lambda rdd: self.process(rdd, callback))
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 70, in readStream
    messageHandler=lambda x: (x.topic, x.message))
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/streaming/kafka.py", line 150, in createDirectStream
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o93.createDirectStreamWithMessageHandler

I've providing the following jar files

spark-sql-kafka-0-10_2.11-2.3.0.jar

spark-streaming-kafka-0-8_2.11-2.3.0.jar

metrics-core-2.2.0.jar

Is this some kind of configuration issue or is there something wrong in the code?

Edit: I am using livy to submit the job to the cluster. Below is the POST request code

    headers = {
    'X-Requested-By': 'admin',
    'Content-Type': 'application/json',
}

data = {
        "numExecutors": stream['executors'],
        "executorCores": stream['cores'],
        "executorMemory": stream['memory'],
        "driverMemory": "2g",
        "file": stream['path'],
        "queue": "default",
        "pyFiles": [
            "hdfs:///user/bds/elastic_search/es_pyspark_handler.py",
            "hdfs:///user/bds/realtime/OwlsenseStream.py"
        ],
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.yammer.metrics:metrics-core:2.2.0"
        },
        "name": stream['name']
    }

    data = json.dumps(data)

    response = requests.post(url=f"http://{IP_ADDRESS}:8998/batches", headers=headers, data=data,
                             verify=False)
Closefisted answered 15/6, 2022 at 7:52 Comment(0)
C
2

I was unable to run the code on these versions probably there was something wrong with the jars versions. I changed the versions to the following:

  • Spark version 2.3.2

  • Kafka 2.8

  • Pyspark 2.3.2

  • Python 3.7

  • Java 1.8.*

I used the following packages:

--packages com.yammer.metrics:metrics-core:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 
Closefisted answered 23/6, 2022 at 6:29 Comment(4)
As mentioned, you don't need both spark-streaming-kafka and spark-sql-kafkaTemplia
yeah I did removed "spark-sql-kafka" from pacakgesClosefisted
Well, Spark Streaming Kafka integration is considered deprecated, so you should actually keep that, and remove the otherTemplia
Thanks a lot. I'll update the answer as well!Closefisted
T
1

You need to also add kafka-clients dependency.

And don't add JARs manually, use packages option to download from Maven. You also don't need both Spark Streaming and Spark Structured Streaming, pick only one...

from pyspark.sql import SparkSession

# TODO: Ensure these match cluster environment
scala_version = '2.11'  
spark_version = '2.3.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.0'  # TODO: match Kafka version
]
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate() 

Also, HDP 2.6.5 is very old, and out of support. I suggest you upgrade to something else; for example, use Apache Zeppelin or Jupyter to run Spark notebooks.

Templia answered 16/6, 2022 at 0:23 Comment(10)
First of all, thanks alot for your reply. I've tried using spark.jars.packages but it still didn't worked. Can you please guide me on how to add kafka-client dependency? I'm using livy to send the spark submit command to the cluster. I've added the POST arguments to the question as well.Closefisted
What exactly didn't work? The conf in your question doesn't have package org.apache.kafka:kafka-clients. I'd suggest defining your dependencies directly in the Python code, as shown, not with LivyTemplia
I tried defining dependencies directly from python but it didn't worked and showed the same error. I've added the kafka-client to the conf but I am still getting the same error in stdout and stderr. The same code is running on another cluster but I am unable to make it run on a new cluster. I am missing something in dependencies but I can't figure out what it is.Closefisted
The dependencies need to be downloaded from Maven repo, so your Spark workers or maybe livy need internet access. You should see logs on the Spark workers indicating if they're trying to download those packages and if they fail. If you cannot allow your cluster to be exposed to the internet, then you will need to copy the jars on your own to each worker. You can also try lowering the Kafka jar version down around 0.11.0Templia
I've checked the livy logs and the jar files are downloaded successfully from the maven repo but still the error remains. The internet access is enabled on the cluster. Also this same code is running on a different cluster. Is there any way to check what are the changes in configuration on both machinesClosefisted
It's not livy that needs the Kafka classes at runtime though, it's the Spark executors, which should be separate from Livy (e.g. The yarn nodemanagers)... The only differences would be in the Spark conf directory.Templia
I've compared all the conf file in the spark conf directory and they are all same. Can you please suggest anything else which I can check.Closefisted
Are you using yarn? Could be a yarn config file difference... Are the clusters in different networks? Could be a networking issues. Is the .m2 folder maybe corrupted in the Spark executors? Maybe try deleting the files for the Kafka dependency on disk and have it download againTemplia
Yes I am using YARN. And the cluster is on the same network. I've compared the YARN files as well which are also same. I guess we'll never know the solution to this :( . Btw Thanks a lot for your help.Closefisted
If you were using scala or java, you could package the Kafka classes as part of the application. For Python, they're required to be downloaded. Scala code will run faster than Pyspark, so maybe try thatTemplia

© 2022 - 2024 — McMap. All rights reserved.