S3 SlowDown error in Spark on EMR
Asked Answered
D

2

22

I am getting this error when writing a parquet file, this has started to happen recently

com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 2CA496E2AB87DC16), S3 Extended Request ID: 1dBrcqVGJU9VgoT79NAVGyN0fsbj9+6bipC7op97ZmP+zSFIuH72lN03ZtYabNIA2KaSj18a8ho=
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1777)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:7)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:125)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:355)
    at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy28.deleteAll(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1331)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:296)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:463)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortJob(FileOutputCommitter.java:482)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:134)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:146)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:494)
    at com.radius.network_data.ingestion.processors.NetworkDataPathProcessor.process(NetworkDataPathProcessor.scala:38)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline.com$radius$network_data$ingestion$NetworkDataIngestionPipeline$$processClient(NetworkDataIngestionPipeline.scala:51)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1$$anonfun$apply$1.apply(NetworkDataIngestionPipeline.scala:42)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1$$anonfun$apply$1.apply(NetworkDataIngestionPipeline.scala:41)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1.apply(NetworkDataIngestionPipeline.scala:41)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1.apply(NetworkDataIngestionPipeline.scala:39)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

any idea about what could be the problem? I am using these write options

result.write
      .mode(SaveMode.Overwrite)
      .partitionBy("entityType")
      .parquet(joinedPath)
Dunford answered 7/9, 2017 at 18:59 Comment(2)
Has your throughput increased lately? How many parquet files are in a batch on average? Generally (docs.aws.amazon.com/AmazonS3/latest/dev/…)Lilithe
I don't think the throughput it has increased a lot... do you think the error might be related with the number of part files?Dunford
D
21

Apparently the problem was caused by writing too many very small part files, it was fixed by reduce the number of output partitions

Dunford answered 11/9, 2017 at 19:35 Comment(2)
How do you reduce the number of output partitions? A code example would help.Pitman
2 ways - remove partitionBy if the partition has a big number of different values - ds.repartition(numPartitions)Dunford
I
-1

This problem occur, When we write too many part files into S3 using Spark.

We can reduce the number of files to 1 per partition using coalesce(1) in spark code. result.coalesce(1).write.mode(SaveMode.Overwrite).partitionBy("entityType").parquet(joinedPath)

Isabelleisac answered 14/12, 2022 at 3:0 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.