Apache Beam/Dataflow Reshuffle
Asked Answered
L

1

19

What is the purpose of org.apache.beam.sdk.transforms.Reshuffle? In the documentation the purpose is defined as:

A PTransform that returns a PCollection equivalent to its input but operationally provides some of the side effects of a GroupByKey, in particular preventing fusion of the surrounding transforms, checkpointing and deduplication by id.

What is the benefit of preventing fusion of the surrounding transforms? I thought fusion is an optimization to prevent unnecessarily steps. Actual use case would be helpful.

Lungwort answered 10/1, 2019 at 3:39 Comment(0)
O
37

There are a couple cases when you may want to reshuffle your data. The following is not an exhaustive list, but should give you and idea about why you may reshuffle:

When one of your ParDo transforms has a very high fanout

This means that the parallelism is increased after your ParDo. If you don't break the fusion here, your pipeline will not be able to split data into multiple machines to process it.

Consider the extreme case of a DoFn that generates a million output elements for every input element. Consider that this ParDo receives 10 elements in its input. If you don't break fusion between this high-fanout ParDo and its downstream transforms, it will only be able to run on 10 machines, although you will have millions of elements.

  • A good way to diagnose this is looking at the number of elements in an input PCollection vs the number of elements of an output PCollection. If the latter is significantly larger than the first, then you may want to consider adding a reshuffle.

When your data is not well balanced across machines**

Imagine that your pipeline consumes 9 files of 10MB and one file of 10GB. If each file is read by a single machine, you will have one machine with a lot more data than the others.

If you don't reshuffle this data, most of your machines will be idle while your pipeline runs. Reshuffling it allows you to rebalance the data to be processed more evenly across machines.

  • A good way to diagnose this is by looking at how many workers are executing work in your pipeline. If the pipeline is slow, and there is only one worker processing data, then you can benefit from a reshuffle.
Ostrom answered 10/1, 2019 at 15:24 Comment(5)
I see, is there some indications that I can observe through stackdriver graph or logs which indicates that there is a need to reshuffling? It would be great if I can get some signal from Dataflow workers that this is the case that reshuffling would improve the performance rather than guessing.Lungwort
I've edited the question to answer those questions. You can start by looking at the size of the input vs output PCollections in your Dataflow UIOstrom
Would Dataflow ever distribute work across multiple workers based on struggler? For example with auto-scalding it will always try to optimize resources so try to run with fewer workers. In case 2 of your example, do we need to explicitly state that we need more workers by re-shuffling?Lungwort
That's right. When the data source permits, the work can be split - but this is not always possible. For example, for single compressed blocks it is impossible to parallelize their progress - nor when you have a hot key.Ostrom
@Ostrom What about Read transforms? Say you are reading "gs://dir/*.avro" with 3 avro files, where each contain 100+ million rows. Do you reshuffle after the read transform to be able to use say 512 workers? Also I've seen that a read is Split + Read, shouldn't we actually reshuffle after the split and before the read? If that's the case, how can we do that where the only thing we have in client code is one unbreakable Read transform?Preciosity

© 2022 - 2024 — McMap. All rights reserved.