Spark parquet s3 Error : AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: xxxxx, AWS Error Code: null
Asked Answered
A

2

6

I am trying to read a parquet file which is present in AWS S3 and getting the below error.

17/12/19 11:27:40 DEBUG DAGScheduler: ShuffleMapTask finished on 0
17/12/19 11:27:40 DEBUG DAGScheduler: submitStage(ResultStage 2)
17/12/19 11:27:40 DEBUG DAGScheduler: missing: List(ShuffleMapStage 1)
17/12/19 11:27:40 DEBUG DAGScheduler: submitStage(ShuffleMapStage 1)
17/12/19 11:27:40 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1, runningTasks: 2
17/12/19 11:27:40 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 4, ip-xxx-xxx-xxx-xxx.ec2.internal): com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: xxxxxxx, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: xxxxxxxx/xxxxxxxxx=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:688)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:71)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
    at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:157)
    at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
    at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.<init>(SqlNewHadoopRDD.scala:180)
    at org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

17/12/19 11:27:40 DEBUG DAGScheduler: submitStage(ResultStage 2)

After enabling the DEBUG logs in Spark i could see now a NPE while submitting the spark job to S3.

17/12/19 11:27:39 INFO SparkContext: Starting job: count at TestRead.scala:54
17/12/19 11:27:39 INFO DAGScheduler: Registering RDD 4 (count at TestRead.scala:54)
17/12/19 11:27:39 INFO DAGScheduler: Got job 1 (count at TestRead.scala:54) with 1 output partitions
17/12/19 11:27:39 INFO DAGScheduler: Final stage: ResultStage 2 (count at TestRead.scala:54)
17/12/19 11:27:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
17/12/19 11:27:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
17/12/19 11:27:39 DEBUG DAGScheduler: submitStage(ResultStage 2)
17/12/19 11:27:39 DEBUG DAGScheduler: missing: List(ShuffleMapStage 1)
17/12/19 11:27:39 DEBUG DAGScheduler: submitStage(ShuffleMapStage 1)
17/12/19 11:27:39 DEBUG DAGScheduler: missing: List()
17/12/19 11:27:39 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[4] at count at TestRead.scala:54), which has no missing parents
17/12/19 11:27:39 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 1)
17/12/19 11:27:39 DEBUG ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Failed to use InputSplit#getLocationInfo.
java.lang.NullPointerException
    at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:114)
    at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:114)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.rdd.HadoopRDD$.convertSplitLocationInfo(HadoopRDD.scala:412)
    at org.apache.spark.rdd.SqlNewHadoopRDD.getPreferredLocations(SqlNewHadoopRDD.scala:259)
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1556)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1555)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1553)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1553)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1556)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1555)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1555)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1553)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1553)
    at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1519)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$14.apply(DAGScheduler.scala:969)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$14.apply(DAGScheduler.scala:969)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:969)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:924)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:923)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:923)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
17/12/19 11:27:39 DEBUG ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Failed to use InputSplit#getLocationInfo.

I am not sure why i am getting this error. Here is my code :-

val conf = new SparkConf(true).setAppName("TestRead")
conf.set(SPARK_CASS_CONN_HOST, config.cassHost)
conf.set("spark.cassandra.connection.timeout_ms","10000")

val sc = new SparkContext(conf)
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.access.key", config.s3AccessKey)
sc.hadoopConfiguration.set("fs.s3a.secret.key", config.s3SecretKey)

val sqlContext = new SQLContext(sc)

val data = sqlContext.read.parquet(s"s3a://${config.s3bucket}/${config.s3folder}")

println(s"data size is ${data.count()}")
data.show()
if(config.cassWriteToTable){
      println("Writing to Cassandra Table")
      data.write.mode(SaveMode.Append).format("org.apache.spark.sql.cassandra").options(Map("table" -> config.cassTable, "keyspace" -> config.cassKeyspace)).save()
}

println("Stopping TestRead...")
sc.stop()

I have included the below dependencies in my build.sbt file:-

"org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided,test" ,
  "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided",
  "com.typesafe.play" % "play-json_2.10" % "2.4.6" excludeAll(ExclusionRule(organization = "com.fasterxml.jackson.core")),
  "mysql" % "mysql-connector-java" % "5.1.39",
  "com.amazonaws" % "aws-java-sdk-pom" % "1.11.7" exclude("commons-beanutils","commons-beanutils") exclude("commons-collections","commons-collections") excludeAll ExclusionRule(organization = "javax.servlet")

What could be the issue here ?

Alamo answered 19/12, 2017 at 11:59 Comment(0)
C
0

403/Forbidden: your login doesn't have access to the file you are trying to read.

regarding the NPE, file a bug report against spark at issues.apache.org; they'll work on seeing who is to blame.

Before you do that though: make sure you are using an up to date Spark version, and do a search for that NPE. No need to file a duplicate, especially if already fixed.

Centrosome answered 1/1, 2018 at 13:25 Comment(0)
I
0

I had a similar issue and resolved it by upgrading the Hadoop library to 2.8.5 from 2.7.x.

For the docs (https://hadoop.apache.org/docs/r2.8.5/)

S3A improvements: add ability to plug in any AWSCredentialsProvider, support read s3a credentials from hadoop credential provider API in addition to XML configuraiton files, support Amazon STS temporary credentials

You may also want to use "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider" for the credentials provider and specify a token in addition to access and secret key.

Icaria answered 21/2, 2019 at 22:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.