I have a JavaPairRDD<Integer, Integer[]>
on which I want to perform a groupByKey
action.
The groupByKey
action gives me a:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
which is practically an OutOfMemory error, if I am not mistaken. This occurs only in big datasets (in my case when "Shuffle Write" shown in the Web UI is ~96GB).
I have set:
spark.serializer org.apache.spark.serializer.KryoSerializer
in $SPARK_HOME/conf/spark-defaults.conf
, but I am not sure if Kryo is used to serialize my JavaPairRDD.
Is there something else that I should do to use Kryo, apart from setting this conf parameter, to serialize my RDD? I can see in the serialization instructions that:
Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.
and that:
Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.
I also noticed that when I set spark.serializer to be Kryo, the Shuffle Write in the Web UI increases from ~96GB (with default serializer) to 243GB!
EDIT: In a comment, I was asked about the logic of my program, in case groupByKey can be replaced with reduceByKey. I don't think it's possible, but here it is anyway:
Input has the form:
- key: index bucket id,
- value: Integer array of entity ids in this bucket
The shuffle write operation produces pairs in the form:
- entityId
- Integer array of all entity Ids in the same bucket (call them neighbors)
The
groupByKey
operation gathers all the neighbor arrays of each entity, some possibly appearing more than once (in many buckets).After the
groupByKey
operation, I keep a weight for each bucket (based on the number of negative entity ids it contains) and for each neighbor id I sum up the weights of the buckets it belongs to.I normalize the scores of each neighbor id with another value (let's say it's given) and emit the top-3 neighbors per entity.
The number of distinct keys that I get is around 10 million (around 5 million positive entity ids and 5 million negatives).
EDIT2: I tried using Hadoop's Writables (VIntWritable and VIntArrayWritable extending ArrayWritable) instead of Integer and Integer[], respectively, but the shuffle size was still bigger than the default JavaSerializer.
Then I increased the spark.shuffle.memoryFraction
from 0.2 to 0.4 (even if deprecated in version 2.1.0, there is no description of what should be used instead) and enabled offHeap memory, and the shuffle size was reduced by ~20GB. Even if this does what the title asks, I would prefer a more algorithmic solution, or one that includes a better compression.
jsc.parallelize(getKeys()).groupByKey(i -> i).flatMapValues(i -> loadValuesFor(i))
– Anything