Apache Beam: DoFn.Setup equivalent in Python SDK
Asked Answered
C

3

9

What is the recommended way to do expensive one-off initialization in a Beam Python DoFn? The Java SDK has DoFn.Setup, but there doesn't appear to be an equivalent in Beam Python.

Is the best way currently to attach objects to threading.local() in the DoFn initializer?

Coloration answered 28/10, 2018 at 17:40 Comment(1)
Actually threading.local doesn't work either issues.apache.org/jira/browse/…, globals is probably the best solution right now.Coloration
O
6

Setup and teardown have now been added to the Python SDK and are the recommended way to do expensive one-off initialization in a Beam Python DoFn.

Omsk answered 5/7, 2019 at 19:16 Comment(2)
Currently doesn't work w/ DirectRunner: issues.apache.org/jira/browse/BEAM-7885Legible
Of course setup/teardown is the right way to do it (and it's much cleaner code-wise), but I'm curious: do they actually do anything different compared to the old workaround of having a lazy-initialized class or global object?Sheepshead
P
19

Dataflow Python is not particularly transparent about the optimal method for initializing expensive objects. There are a few mechanisms by which objects can be instantiated infrequently (it is currently not ideal to perform exactly once initialization). Below are outlined some of the experiments I have run and conclusions I have come to. Hopefully someone from the Beam community can help correct me wherever I have strayed.

__init__

Although the __init__ method can be used to initialize an expensive object exactly once, this initialization does not happen on the Worker machines. The object will need to be serialized in order to be sent off to the Worker which, for large objects, as well as Tensorflow models, can be quite unwieldy or not work at all. Furthermore, since this object will be serialized and sent over a wire, it is not secure to perform initializations here, as payloads can be intercepted. The recommendation is against using this method.

start_bundle()

Dataflow processes data in discrete groups that it calls bundles. These are fairly well defined in batch processes, but in streaming they are dependent on the throughput. There are no mechanisms for configuring how Dataflow creates its bundles, and in fact the size of a bundle is entirely dictated by Dataflow. The start_bundle() method will be called on the Worker and can be used to initialize state, however experiments find that in a streaming context, this method is called more frequently than desired, and expensive re-initializations would happen quite often.

Lazy initialization

This methodology was suggested by the Beam docs and is somewhat surprisingly the most performant. Lazy initialization means that you create some stateful parameter that you initialize to None, then execute code such as the following:

if self.expensive_object is None:
    self.expensive_object = self.__expensive_initialization()

You can execute this code directly in your process() method. You can also put together some helper functions easily enough that rely on global state so that you can have functions such as (an example of what this might look like is at the bottom of this post):

self.expensive_object = get_or_initialize_global(‘expensive_object’, self.__expensive_initialization)

Experiments

The following experiments were run on a job that was configured using both start_bundle and the lazy initialization method described above, with appropriate logging to indicate invocation. Various throughput was published to the appropriate queue and the results were recorded accordingly.

At a rate of 1 msg/sec over 100s:

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                             100 
LAZY INITIALIZATION                                     25 
TOTAL MESSAGES                                         100 

At a rate of 10 msg/sec over 100s

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                             942 
LAZY INITIALIZATION                                      3 
TOTAL MESSAGES                                        1000 

At a rate of 100 msg/sec over 100s

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                            2447 
LAZY INITIALIZATION                                     30 
TOTAL MESSAGES                                       10000 

At a rate of 1000 msg/sec over 100s

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                            2293 
LAZY INITIALIZATION                                     36 
TOTAL MESSAGES                                      100000 

Takeaways

Although start_bundle works well for high throughput, lazy initialization is nonetheless the most performant by a wide margin regardless of throughput. It is the recommended way of performing expensive initializations on Python Beam. This result is perhaps not too surprising given this quote from the official docs:

Setup - called once per DoFn instance before anything else; this has not been implemented in the Python SDK so the user can work around just with lazy initialization

The fact that is is called a "work around" is not particularly encouraging though, and maybe we can expect something more robust in the near future.

Code Samples

Courtesy of Andreas Jansson:

def get_or_initialize_global(object_key, initialize_expensive_object):
    if object_key in globals():
        expensive_object = globals()[object_key]
    else:
        expensive_object = initialize_expensive_object()
        globals()[object_key] = expensive_object
Prato answered 29/1, 2019 at 19:40 Comment(4)
Just realized the second test scored better on lazy init than the first, I wonder if this was maybe some sort of warm up error; the same job was used for all 4 testsPrato
Thanks for this. Best piece of information I could find. Have you tried something like lazy initialization inside method decorated with @property or @lru_cache?Worthy
Setup and teardown have been added to the Python SDK: github.com/apache/beam/pull/7994Omsk
@Omsk I think you should add this as an answer, as it is the correct solution.Prato
O
6

Setup and teardown have now been added to the Python SDK and are the recommended way to do expensive one-off initialization in a Beam Python DoFn.

Omsk answered 5/7, 2019 at 19:16 Comment(2)
Currently doesn't work w/ DirectRunner: issues.apache.org/jira/browse/BEAM-7885Legible
Of course setup/teardown is the right way to do it (and it's much cleaner code-wise), but I'm curious: do they actually do anything different compared to the old workaround of having a lazy-initialized class or global object?Sheepshead
D
1

This sounds like it could be it https://beam.apache.org/releases/pydoc/2.8.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.start_bundle

Dato answered 28/10, 2018 at 19:16 Comment(2)
Thanks Alex! The start_bundle function runs at the start of a set of items, but is executed multiple times over the DoFn lifetime. In the Java SDK there's Setup(), StartBundle() and Process() (the difference is discussed here: https://mcmap.net/q/497797/-what-is-the-difference-between-dofn-setup-and-dofn-startbundle), but Python has only start_bundle() and process().Coloration
Hmm. Right you are. Even searching for 'teardown' & 'setup' in the apache_beam==2.4.0 source code yielded nothing promising, so maybe its not supported yet. There is some mention of Teardown Policy in the internal dataflow runner library: github.com/apache/beam/blob/master/sdks/python/apache_beam/… Hopefully it makes it's way to us soon.Dato

© 2022 - 2024 — McMap. All rights reserved.