Why do I need to shuffle my PCollection for it to autoscale on Cloud Dataflow?
Asked Answered
H

1

6

Context

I am reading a file from Google Storage in Beam using a process that looks something like this:

data = pipeline | beam.Create(['gs://my/file.pkl']) | beam.ParDo(LoadFileDoFn)

Where LoadFileDoFn loads the file and creates a Python list of objects from it, which ParDo then returns as a PCollection.

I know I could probably implement a custom source to achieve something similar, but this answer and Beam's own documentation indicate that this approach with pseudo-dataset to read via ParDo is not uncommon and custom sources may be overkill.

It also works - I get a PCollection with the correct number of elements, which I can process as I like! However..

Autoscaling problems

The resulting PCollection does not autoscale at all on Cloud Dataflow. I first have to transform it via:

shuffled_data = data | beam.Shuffle()

I know this answer I also linked above explains pretty much this process - but it doesn't give any insight as to why this is necessary. As far as I can see at Beam's very high level of abstraction, I have a PCollection with N elements before the shuffle and a similar PCollection after the shuffle. Why does one scale, but the other not?

The documentation is not very helpful in this case (or in general, but that's another matter). What hidden attribute does the first PCollection have that prevents it from being distributed to multiple workers that the other doesn't have?

Hardened answered 21/12, 2018 at 13:6 Comment(0)
D
10

When you read via Create you are creating a PCollection that is bound to 1 worker. Since there are no keys associated with items there is no mechanism to distribute the work. Shuffle() will create a K,V under neath the covers and then shuffle which enables the PCollection items to be distributed to new workers as they spin up. You verify this behavior by turning off auto-scaling and fixing the worker size say to 25 - without the Shuffle you will only see 1 worker doing work.

Another way to distribute this work when Creating/Reading would be to build your own custom I/O for reading PKL files1. You'd create the appropriate splitter; however, not knowing what you have pickled it may not be splittable. IMO Shuffle() is a safe bet, modulo you having optimization to gain by writing a splittable reader.

Disbelieve answered 21/12, 2018 at 15:42 Comment(1)
Thanks, I guess it being bound to one worker explains it. I sort of expected a PCollection without explicit keys to be scalable by default (it being a collection of independent objects and such), but I suppose restricting it to one worker is the cautious approach. If it all hinges on keys then the reason the shuffle approach works also becomes a bit more transparent (I found it really non-obvious at first).Hardened

© 2022 - 2024 — McMap. All rights reserved.