# Creating PySpark Object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("XMLParser").getOrCreate()
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", aws_key)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", aws_secret)
Then I am able to read the file using following code from my s3 bucket
df = spark.read.format("xml").options(rootTag='returnResult', rowTag="query").load("s3n://bucketName/folder/file.xml")
But when I tried to write back to s3 using delta lake (parquet file) using this code
df.write.format("delta").mode('overwrite').save("s3n://bucket/folder/file")
I am getting this error
Py4JJavaError: An error occurred while calling o778.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
See https://docs.delta.io/latest/delta-storage.html " for details.
at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:157)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply$mcJ$sp(OptimisticTransaction.scala:434)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply(OptimisticTransaction.scala:416)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply(OptimisticTransaction.scala:416)
at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:152)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.doCommit(OptimisticTransaction.scala:415)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply$mcJ$sp(OptimisticTransaction.scala:326)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)
at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.commit(OptimisticTransaction.scala:284)
at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:80)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:67)
at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:134)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3n.impl=null: No AbstractFileSystem configured for scheme: s3n
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
... 53 more
I tried to follow the link given in the stacktrace, but not able to figure out how can i resolve this. Any help would be appericiated
hadoop-aws
are you using? – Party