spark - How to reduce the shuffle size of a JavaPairRDD<Integer, Integer[]>?
Asked Answered
R

3

6

I have a JavaPairRDD<Integer, Integer[]> on which I want to perform a groupByKey action.

The groupByKey action gives me a:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle

which is practically an OutOfMemory error, if I am not mistaken. This occurs only in big datasets (in my case when "Shuffle Write" shown in the Web UI is ~96GB).

I have set:

spark.serializer org.apache.spark.serializer.KryoSerializer

in $SPARK_HOME/conf/spark-defaults.conf, but I am not sure if Kryo is used to serialize my JavaPairRDD.

Is there something else that I should do to use Kryo, apart from setting this conf parameter, to serialize my RDD? I can see in the serialization instructions that:

Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.

and that:

Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.

I also noticed that when I set spark.serializer to be Kryo, the Shuffle Write in the Web UI increases from ~96GB (with default serializer) to 243GB!

EDIT: In a comment, I was asked about the logic of my program, in case groupByKey can be replaced with reduceByKey. I don't think it's possible, but here it is anyway:

  • Input has the form:

    • key: index bucket id,
    • value: Integer array of entity ids in this bucket
  • The shuffle write operation produces pairs in the form:

    • entityId
    • Integer array of all entity Ids in the same bucket (call them neighbors)
  • The groupByKey operation gathers all the neighbor arrays of each entity, some possibly appearing more than once (in many buckets).

  • After the groupByKey operation, I keep a weight for each bucket (based on the number of negative entity ids it contains) and for each neighbor id I sum up the weights of the buckets it belongs to.

  • I normalize the scores of each neighbor id with another value (let's say it's given) and emit the top-3 neighbors per entity.

The number of distinct keys that I get is around 10 million (around 5 million positive entity ids and 5 million negatives).

EDIT2: I tried using Hadoop's Writables (VIntWritable and VIntArrayWritable extending ArrayWritable) instead of Integer and Integer[], respectively, but the shuffle size was still bigger than the default JavaSerializer.

Then I increased the spark.shuffle.memoryFraction from 0.2 to 0.4 (even if deprecated in version 2.1.0, there is no description of what should be used instead) and enabled offHeap memory, and the shuffle size was reduced by ~20GB. Even if this does what the title asks, I would prefer a more algorithmic solution, or one that includes a better compression.

Recollected answered 11/3, 2017 at 9:28 Comment(8)
This to me looks like a case of "know your data". Without knowing anything about it, I would want to ask (a) how many keys you're likely to have and (b) how feasible is it to load a single key (or a subset of keys) at a time?Anything
(a) about 10M keys, (b) loading a single key at a time is feasible, but I cannot know exactly how many Integer arrays I will get for each key, since I cannot yet groupByKeyRecollected
Can you get the list of keys in advance? If so, the solution might be something like: jsc.parallelize(getKeys()).groupByKey(i -> i).flatMapValues(i -> loadValuesFor(i))Anything
I am not sure I get 100% what you mean, but I am afraid I cannot do that (loadValuesFor(i))Recollected
Ah, I thought you said loading a single key at a time is feasible. If not, then my suggestion should go out the window.Anything
@Recollected why do you need to do groupByKey? what is processing logic after that? is it possible to replace it with reduceByKey or combineByKey? how many distinct keys do you have?Aitken
I guess it would be extremely helpful if you add processing logic in details to the original question. In most cases I've seen, it's possible to replace groupByKey with more performant alternative.Aitken
@VitaliyKotlyarenko I have updated my question.Recollected
R
2

Short Answer: Use fastutil and maybe increase spark.shuffle.memoryFraction.

More details: The problem with this RDD is that Java needs to store Object references, which consume much more space than primitive types. In this example, I need to store Integers, instead of int values. A Java Integer takes 16 bytes, while a primitive Java int takes 4 bytes. Scala's Int type, on the other hand, is a 32-bit (4-byte) type, just like Java's int, that's why people using Scala may not have faced something similar.

Apart from increasing the spark.shuffle.memoryFraction to 0.4, another nice solution was to use the fastutil library, as suggest in Spark's tuning documentation:

The first way to reduce memory consumption is to avoid the Java features that add overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this: Design your data structures to prefer arrays of objects, and primitive types, instead of the standard Java or Scala collection classes (e.g. HashMap). The fastutil library provides convenient collection classes for primitive types that are compatible with the Java standard library.

This enables storing each element in int array of my RDD pair as an int type (i.e., using 4 bytes instead of 16 for each element of the array). In my case, I used IntArrayList instead of Integer[]. This made the shuffle size drop significantly and allowed my program to run in the cluster. I also used this library in other parts of the code, where I was making some temporary Map structures. Overall, by increasing spark.shuffle.memoryFraction to 0.4 and using fastutil library, shuffle size dropped from 96GB to 50GB (!) using the default Java serializer (not Kryo).

Alternative: I have also tried sorting each int array of an rdd pair and storing the deltas using Hadoop's VIntArrayWritable type (smaller numbers use less space than bigger numbers), but this also required to register VIntWritable and VIntArrayWritable in Kryo, which didn't save any space after all. In general, I think that Kryo only makes things work faster, but does not decrease the space needed, but I am not still sure about that.

I am not marking this answer as accepted yet, because someone else might have a better idea, and because I didn't use Kryo after all, as my OP was asking. I hope reading it, will help someone else with the same issue. I will update this answer, if I manage to further reduce the shuffle size.

Recollected answered 19/3, 2017 at 17:32 Comment(0)
R
2

Still not really sure what you want to do. However, because you use groupByKey and say that there is no way to do it by using reduceByKey, it makes me more confused.

I think you have rdd = (Integer, Integer[]) and you want something like (Integer, Iterable[Integer[]]) that's why you are using groupByKey. Anyway, I am not really familiar with Java in Spark, but in Scala I would use reduceByKey to avoid the shuffle by rdd.mapValues(Iterable(_)).reduceByKey(_++_) . Basically, you want to convert the value to a list of array and then combine the list together.

Repand answered 20/3, 2017 at 19:54 Comment(3)
Thanks for the feedback. I wonder if this is faster than groupByKey, then why isn't it the default way groupByKey works? That's my reasoning for not trying it so far, but now that you suggest it, I will definitely give it a shot. Any ideas though why groupByKey does not use a combiner (map-side aggregation)?Recollected
This is not exactly what I was looking for, but I will give you the bounty, as it was closer to what I needed than Armin's answer. I wish I could split it in half and share it. Anyway, the bounty was for attracting attention, and it fulfilled its purpose, so here it is!Recollected
@Recollected groupByKey is quite a weird implementation. I am not sure why do not do exactly the same thing with reduceByKey. My guess is that groupByKey is used for combined all the data with the same key in same partition, which would be lovely if you have done a lot of transformation (so data cut down to small set) and now want to persist for further use. For reduceByKey, the idea is more about you only want to send the final combination out.Repand
G
1

I think the best approach that can be recommended here (without more specific knowledge of the input data) in general is to use the persist API on your input RDD.

As step one, I'd try to call .persist(MEMORY_ONLY_SER) on the input, RDD to lower memory usage (albeit at a certain CPU overhead, that shouldn't be that much of a problem for ints in your case).

If that is not sufficient you can try out .persist(MEMORY_AND_DISK_SER) or if your shuffle still takes so much memory that the input dataset needs to be made easier on the memory .persist(DISK_ONLY) may be an option, but one that will strongly deteriorate performance.

Gibeon answered 18/3, 2017 at 15:19 Comment(5)
thanks Armin. Why persist if I am not going to use it later? I have tried that without seeing a reduced shuffle size. I found that "Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it." in (spark.apache.org/docs/latest/…)Recollected
@Recollected careful :) I may have explained this badly and will edit depending on your answer: Did you call persist on the input RDD? I.e. first inputRdd.persist(MEMORY_ONLY_SER) and after that inputRdd.reduceByKey(...)? It sounds like you called persist on the shuffle result?Gibeon
Yes, I called persist on the input RDD, i.e., just before groupByKey. I also tried several options, such as memory only ser, memory and disk ser, and disk only. You are right, my previous comment was not clear.Recollected
@Recollected Reading your answer now I think I may have given you an answer to the wrong question then, sorry for that. What my suggestion gets you is an overall reduction in memory use (and by that more free memory for the shuffle), since the input RDD will be stored to disk actively (instead of simply forcing a spill during shuffle, which will not always be 100% stable when resources are stretched due to the nature of how Java GC works). I fully agree that the suggestion won't lower the actual size of the shuffle output. That requires a change to the job itself I'm afraid as you point out.Gibeon
Your answer was useful though and something that I should have mentioned. Thanks!Recollected

© 2022 - 2024 — McMap. All rights reserved.