AWS Glue RDD.saveAsTextFile() raises Class org.apache.hadoop.mapred.DirectOutputCommitter not found
Asked Answered
R

3

5

I'm creating the simple ETL that reads a billion of files and re-partition them (in other words, compact to smaller amount for further processing).

Simple AWS Glue application:

import org.apache.spark.SparkContext

object Hello {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val input_path =  "s3a://my-bucket-name/input/*"
    val output_path = "s3a://my-bucket-name/output/*"
    val num_partitions = 5
    val ingestRDD = spark.textFile(input_path)
    ingestRDD.repartition(num_partitions).saveAsTextFile(output_path)    
  }
}

raises the following traceback:

ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Exception in User Class: java.lang.RuntimeException : java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.mapred.DirectOutputCommitter not found
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2401)
org.apache.hadoop.mapred.JobConf.getOutputCommitter(JobConf.java:725)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1048)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
Hello$.main(hello_world_parallel_rdd_scala:18)
Hello.main(hello_world_parallel_rdd_scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
com.amazonaws.services.glue.SparkProcessLauncherPlugin$class.invoke(ProcessLauncher.scala:38)
com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:67)
com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:108)
com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:21)
com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)

At the same time this code code works in Local Environment, in Cluster and in EMR Cluster.

Rider answered 22/12, 2020 at 13:26 Comment(0)
R
3
import org.apache.spark.SparkContext

object Hello {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    spark.hadoopConfiguration.set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter")
    val input_path =  "s3a://my-bucket-name/input/*"
    val output_path = "s3a://my-bucket-name/output/*"
    val num_partitions = 5
    val ingestRDD = spark.textFile(input_path)
    ingestRDD.repartition(num_partitions).saveAsTextFile(output_path)    
  }
}
Rider answered 22/12, 2020 at 13:26 Comment(1)
thanks this worked great! Could you add a bit of text about how this changes the configuration and what it does?Amatory
M
3

Setting hadoopConfiguration for pyspark,

sc._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter")
Multiphase answered 7/1, 2021 at 11:54 Comment(1)
thanks this worked great! Could you add a bit of text about how this changes the configuration and what it does?Amatory
T
3

We have to have the DirectFileOutputCommitter depend of the context. If we are using the spark context then the output commiter would be set like this:

spark.hadoopConfiguration.set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter")

If we are using the glue context then like this:

glueContext._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter")

Why we need this:
Generally we use the FileOutputCommitter which writes the files to a _temporary folder. Then it will be renamed to its final location. It is used for the HDFS.
But the DirectFileOutputCommitter doesn't write to the _temporary location. It writes directly to the final location. It is required for the S3.

Why we need such two separate classes:
HDFS does not allows more than one writer at a time for a file. But the S3 allows multiple writers to write the same file.

Tillis answered 23/6, 2022 at 14:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.