java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
Asked Answered
R

3

1

I am new to spark and delta-lake and trying to do one POC with pyspark and using minio as delta-lake's storage backend. However, I am getting error that

Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

I have added the jar in python code and assuming it'll download the required jar on runtime. I am not able to understand where I am doing wrong.

Can someone please help me out ?

Thanks

ENV

OS: Windows 11

Spark: Apache Spark 3.3.1

Java Version: openjdk version "11.0.13" 2021-10-19

Python Version: 3.9.13

Python packages: pyspark 3.2.3, delta-spark 2.0.2

CODE

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key", <my key>) \
    .config("spark.hadoop.fs.s3a.secret.key", <my secret>) \
    .config("spark.hadoop.fs.s3a.endpoint", <my endpoint>) \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

data = spark.range(0, 5)

data.write.format("delta").save("s3a://<my bucket>/delta-lake/demo")

df = spark.read.format("delta").load("tmp/delta-table")
df.show()

OUTPUT

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/C:/Spark/spark-3.2.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/C:/Spark/spark-3.2.3-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\shari\.ivy2\cache
The jars for the packages stored in: C:\Users\shari\.ivy2\jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-050502ed-ca85-4e4f-b7a3-ff69c12689d3;1.0
        confs: [default]
        found io.delta#delta-core_2.12;2.0.2 in central
        found io.delta#delta-storage;2.0.2 in central
        found org.antlr#antlr4-runtime;4.8 in central
        found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 173ms :: artifacts dl 0ms
        :: modules in use:
        io.delta#delta-core_2.12;2.0.2 from central in [default]
        io.delta#delta-storage;2.0.2 from central in [default]
        org.antlr#antlr4-runtime;4.8 from central in [default]
        org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-050502ed-ca85-4e4f-b7a3-ff69c12689d3
        confs: [default]
        0 artifacts copied, 4 already retrieved (0kB/0ms)
23/02/16 16:19:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "D:\DLT\quin\quin-experian-elt\el\delta_test.py", line 19, in <module>
    data.write.format("delta").save("s3a://quin-third-party-data-dev-1/delta-lake/demo")
  File "C:\Users\shari\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pyspark\sql\readwriter.py", line 740, in save
    self._jwrite.save(path)
  File "C:\Users\shari\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\shari\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pyspark\sql\utils.py", line 111, in deco
    return f(*a, **kw)
  File "C:\Users\shari\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o61.save.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2667)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:620)
        at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:530)
        at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:153)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:349)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2571)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2665)
        ... 51 more

SUCCESS: The process with PID 6172 (child process of PID 24096) has been terminated.
SUCCESS: The process with PID 24096 (child process of PID 27152) has been terminated.
SUCCESS: The process with PID 27152 (child process of PID 25700) has been terminated.
Remsen answered 16/2, 2023 at 12:24 Comment(1)
F
4

The problem is most probably caused by the use of the configure_spark_with_delta_pip function that overrides spark.jars.packages option that you used for specifying Hadoop AWS dependency.

To fix it you need to specify that dependency as additional parameter to the same function:

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key", <my key>) \
    .config("spark.hadoop.fs.s3a.secret.key", <my secret>) \
    .config("spark.hadoop.fs.s3a.endpoint", <my endpoint>) \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
my_packages = ["org.apache.hadoop:hadoop-aws:3.3.1"]
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

P.S. Check version compatibility of your Spark with Delta - see this compatibility matrix to find a correct version.

Falter answered 16/2, 2023 at 13:28 Comment(3)
It seemed to work. Now it's downloading the required packages on runtime. However, I encountered the below error. java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)' I can't understand what link I am missing here. Any hints ?Remsen
most probably Hadoop compatibility: sparkbyexamples.com/spark/…Falter
It turned out that I was missing hadoop.dll file. It was a great help. Much appreciated. thanksRemsen
A
0

It seems that your error is being caused by missing aws jars in the Spark image/jars that you're using.

Spark needs hadoop-aws in order to interface with S3A.

You'll need to add the hadoop-aws jar to your classpath. This should contain the class that's currently missing: org.apache.hadoop.fs.s3a.S3AFileSystem.

Once you're able to write data to S3A from Spark, you'll next need to verify that you are using the S3A committers to write your data. The default committers are not optimised for S3A.


Read this for more information on the S3A committers:

https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html


Link to maven repository for hadoop-aws:

https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws

Asaph answered 16/2, 2023 at 12:38 Comment(0)
Q
0

In order to make the example work, I needed even more jars until all classes were found, and my complete configuration is as follows. The version numbers are compatible with delta-spark_2.12:3.0.0.

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .config("spark.hadoop.fs.s3a.access.key", "TODO") \
            .config("spark.hadoop.fs.s3a.secret.key", "TODO") \
            .config("spark.hadoop.fs.s3a.endpoint", "http://thenameoftheminiohost:9000") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
            .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \

# spark.hadoop.fs.s3a.path.style.access
# see https://mcmap.net/q/394640/-spark-path-style-access-with-fs-s3a-path-style-access-property-is-not-working
# see https://github.com/minio/minio/issues/7020

# https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
# and also https://github.com/delta-io/delta/issues/895
# and also https://mcmap.net/q/393205/-java-lang-noclassdeffounderror-org-apache-hadoop-fs-storagestatistics
# and also https://mcmap.net/q/394641/-noclassdeffounderror-raised-when-reading-minio-data-using-pyspark
my_packages = ["org.apache.hadoop:hadoop-aws:3.3.4",
               "org.apache.hadoop:hadoop-client-runtime:3.3.4",
               "org.apache.hadoop:hadoop-client-api:3.3.4",
               "io.delta:delta-contribs_2.12:3.0.0",
               "io.delta:delta-hive_2.12:3.0.0",
               "com.amazonaws:aws-java-sdk-bundle:1.12.603",
               ]

# Create a Spark instance with the builder
# As a result, you now can read and write Delta tables
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html
spark.sparkContext.setLogLevel("DEBUG") # or TRACE, useful if there are connection issues!

spark.range(5).write.mode("overwrite").format("delta").save("s3a://nameofmybucket/five")
print(f"done saving")

pip list outputs the following:

Package            Version
------------------ ------------
delta-spark        3.0.0
importlib-metadata 6.9.0
numpy              1.26.2
pandas             2.1.3
pip                20.3.4
py4j               0.10.9.7
pyarrow            14.0.1
pyspark            3.5.0
python-dateutil    2.8.2
pytz               2023.3.post1
setuptools         52.0.0
six                1.16.0
tzdata             2023.3
wheel              0.34.2
zipp               3.17.0
Quondam answered 3/12, 2023 at 0:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.