I'm running Spark version 2.3.0.2.6.5.1175-1 with Python 3. 6.8 on Ambari. While submitting the application I get the following logs in stderr
22/06/15 12:29:31 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint Exception in thread "Thread-10" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.privateGetPublicMethods(Class.java:2902) at java.lang.Class.getMethods(Class.java:1615) at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 12 more 22/06/15 12:29:33 ERROR ApplicationMaster: User application exited with status 1
Following are the stdout logs
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1062, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 908, in send_command
response = connection.send_command(command)
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1067, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
File "read_stream.py", line 13, in <module>
stream.start(callback=callback)
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 125, in start
output = self.readStream().foreachRDD(lambda rdd: self.process(rdd, callback))
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 70, in readStream
messageHandler=lambda x: (x.topic, x.message))
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/streaming/kafka.py", line 150, in createDirectStream
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o93.createDirectStreamWithMessageHandler
I've providing the following jar files
spark-sql-kafka-0-10_2.11-2.3.0.jar
spark-streaming-kafka-0-8_2.11-2.3.0.jar
metrics-core-2.2.0.jar
Is this some kind of configuration issue or is there something wrong in the code?
Edit: I am using livy to submit the job to the cluster. Below is the POST request code
headers = {
'X-Requested-By': 'admin',
'Content-Type': 'application/json',
}
data = {
"numExecutors": stream['executors'],
"executorCores": stream['cores'],
"executorMemory": stream['memory'],
"driverMemory": "2g",
"file": stream['path'],
"queue": "default",
"pyFiles": [
"hdfs:///user/bds/elastic_search/es_pyspark_handler.py",
"hdfs:///user/bds/realtime/OwlsenseStream.py"
],
"conf": {
"spark.jars.packages": "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.yammer.metrics:metrics-core:2.2.0"
},
"name": stream['name']
}
data = json.dumps(data)
response = requests.post(url=f"http://{IP_ADDRESS}:8998/batches", headers=headers, data=data,
verify=False)
spark-streaming-kafka
andspark-sql-kafka
– Templia