Google dataflow streaming pipeline is not distributing workload over several workers after windowing
Asked Answered
M

2

14

I'm trying to set up a dataflow streaming pipeline in python. I have quite some experience with batch pipelines. Our basic architecture looks like this: enter image description here

The first step is doing some basic processing and takes about 2 seconds per message to get to the windowing. We are using sliding windows of 3 seconds and 3 second interval (might change later so we have overlapping windows). As last step we have the SOG prediction that takes about 15ish seconds to process and which is clearly our bottleneck transform.

So, The issue we seem to face is that the workload is perfectly distributed over our workers before the windowing, but the most important transform is not distributed at all. All the windows are processed one at a time seemingly on 1 worker, while we have 50 available.

The logs show us that the sog prediction step has an output once every 15ish seconds which should not be the case if the windows would be processed over more workers, so this builds up huge latency over time which we don't want. With 1 minute of messages, we have a latency of 5 minutes for the last window. When distribution would work, this should only be around 15sec (the SOG prediction time). So at this point we are clueless..

enter image description here

Does anyone see if there is something wrong with our code or how to prevent/circumvent this? It seems like this is something happening in the internals of google cloud dataflow. Does this also occur in java streaming pipelines?

In batch mode, Everything works fine. There, one could try to do a reshuffle to make sure no fusion etc occurs. But that is not possible after windowing in streaming.

args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
                                        job_name='XX',
                                        num_workers=args.workers,
                                        max_num_workers=MAX_NUM_WORKERS,
                                        disk_size_gb=DISK_SIZE_GB,
                                        local=args.local,
                                        streaming=args.streaming)

pipeline = beam.Pipeline(options=pipeline_options)

# Build pipeline
# pylint: disable=C0330
if args.streaming:
    frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
        subscription=SUBSCRIPTION_PATH,
        with_attributes=True,
        timestamp_attribute='timestamp'
    ))

    frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
        create_frame_tuples_fn)

crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
    pred_bbox_tfserv_fn, SERVER_URL)

sliding_windows = bboxs | 'Window' >> beam.WindowInto(
                  beam.window.SlidingWindows(
                        FEATURE_WINDOWS['goal']['window_size'],
                        FEATURE_WINDOWS['goal']['window_interval']),
                  trigger=AfterCount(30),
                  accumulation_mode=AccumulationMode.DISCARDING)

# GROUPBYKEY (per match)
group_per_match = sliding_windows | 'Group' >> beam.GroupByKey()
_ = group_per_match | 'LogPerMatch' >> beam.Map(lambda x: logging.info(
    "window per match per timewindow: # %s, %s", str(len(x[1])), x[1][0][
        'timestamp']))

sog = sliding_windows | 'Predict SOG' >> beam.Map(predict_sog_fn,
                                                SERVER_URL_INCEPTION,
                                                SERVER_URL_SOG )

pipeline.run().wait_until_finish()
Mazel answered 19/2, 2019 at 10:31 Comment(0)
P
6

In beam the unit of parallelism is the key--all the windows for a given key will be produced on the same machine. However, if you have 50+ keys they should get distributed among all workers.

You mentioned that you were unable to add a Reshuffle in streaming. This should be possible; if you're getting errors please file a bug at https://issues.apache.org/jira/projects/BEAM/issues . Does re-windowing into GlobalWindows make the issue with reshuffling go away?

Poignant answered 21/2, 2019 at 8:54 Comment(4)
For the reshuffling, the error is the following: org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to org.apache.beam.sdk.transforms.windowing.GlobalWindowMazel
Hi Robert, I posted an alternative solution, using CombineGlobally, maybe you can advise on my answer how to optimize the load distribution across multiple workers in this caseMisesteem
@robertwb, We found a very dirty workaround to get this working based on your answer. Thanks! One more question, is there a reason why the parallelization is not based on key AND window? Not taking into account the window seems so illogical for small latency applications as ours. We now have to add the window to the key before going trough the windowing (luckily in this case we know what window the elements will be put into) and only then we get it to parallelize without added latency.Mazel
For some windowing functions (e.g. sessions) the window is not known until all the keys are collocated. For those where it is known (e.g. FixedWindows) this would be possible, it's just that it hasn't been an important enough optimization for any runners to do it yet.Poignant
M
2

It looks like you do not necessarily need GroupByKey because you are always grouping on the same key. Instead you could maybe use CombineGlobally to append all the elements inside the window in stead of the GroupByKey (with always the same key).

combined = values | beam.CombineGlobally(append_fn).without_defaults()
combined | beam.ParDo(PostProcessFn())

I am not sure how the load distribution works when using CombineGlobally but since it does not process key,value pairs I would expect another mechanism to do the load distribution.

Misesteem answered 22/2, 2019 at 9:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.