Prevent fusion in Apache Beam / Dataflow streaming (python) pipelines to remove pipeline bottleneck
Asked Answered
S

1

6

We are currently working on a streaming pipeline on Apache Beam with DataflowRunner. We are reading messages from Pub/Sub and do some processing on them and afterwards we window them in slidings windows (currently the window size is 3 seconds and the interval is 3 seconds as well). Once the window is fired we do some post-processing on the elements inside the window. This post-processing step is significantly larger than the window size, it takes about 15 seconds.

The apache beam code of the pipeline:

input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
                   | beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
                                  trigger=AfterCount(30), 
                                  accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)

As you know, Dataflow tries to perform some optimizations on your pipeline steps. In our case it fusions everything together from the windowing onwards (clustered operations: 1/ processing 2/ windowing + post-processing) which is causing a slow sequential post-processing of all the windows by just 1 worker. We see logs every 15 seconds that the pipeline is processing the next window. However, we would like to have multiple workers picking up separate windows instead of the workload going to a single worker.

Therefore we were looking for ways to prevent this fusion from happening so Dataflow separates the window from post-processing of the windows. In that way we would expect Dataflow to be able to assign multiple workers again to the post-processing of fired windows.

What we have tried so far:

  • Increase the number of workers to 20, 30 or even 40 but without effect. Only the steps before the windowing gets assigned to multiple workers
  • Running the pipeline for 5 or 10 minutes but we noticed no worker re-allocation to help on this larger post-processing step after the windowing
  • After the windowing, put them back into a global window
  • Simulate another GroupByKey with a dummy key (as mentioned in https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion) but without any success.

The last two actions indeed created a third clustered operation (1/ processing 2/ windowing 3/ post-processing ) but we noticed that still the same worker is executing everything after the windowing.

Is there any solution that can resolve this problem statement?

The current workaround we are now considering is to build another streaming pipeline which receives the windows so these worker can process the windows in parallel but it is cumbersome..

Septarium answered 20/2, 2019 at 10:30 Comment(1)
Have you tried contacting Google Support? For specific, internal things like this, they are often better able to help than the community.Serration
O
5

You have done the right thing to break fusion in your elements. I suspect there may be an issue getting you into trouble.

For streaming, a single key always gets processed in the same worker. By any chance, are all or most of your records assigned to a single key? If so, your processing will be done in a single worker.

Something that you can do to prevent this is to make the window a part of the key, so that the elements for multiple windows can be processed in different workers even though they have the same key:

class KeyIntoKeyPlusWindow(core.DoFn):
  def process(self, element, window=core.DoFn.WindowParam):
    key, values = element
    yield ((key, window), element)

group = windows | beam.ParDo(KeyIntoKeyPlusWindow() | beam.GroupByKey()

And once you've done that, you can apply your post-processing:

group | beam.Map(post_processing_fn)
Obtect answered 11/3, 2019 at 23:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.