How to supply parameters to a composite transform in Apache Beam?
Asked Answered
B

2

8

I am using the Python SDK of Apache Beam.

I have a few transform steps and want to make them reuseable, which points me to write a custom composite transform like this:

class MyCompositeTransform(beam.PTransform):
def expand(self, pcoll, arg1, kwarg1=u'default'):
    result = (pcoll
              | 'Step 1' >> beam.Map(lambda f: SomeFn(f, arg1))
              | 'Last step' >> beam.Map(lambda f: SomeOtherFn(f, kwarg1))
              )
    return result

What I want is to supply some extra parameters arg1 and kwarg1 which are needed by other transforms within. But I do not know if this is a valid way, nor how to use it in pipeline.

Can someone please point me the direction?

Blakney answered 19/12, 2018 at 6:36 Comment(0)
F
8

You can supply parameters via the PTransform constructor. The parameters can also take the form of side inputs (i.e. the data output from another transform). Here is an example that uses both "normal" parameter and side input.

from typing import Dict, Any, Iterable
import apache_beam as beam


class MyCompositeTransform(beam.PTransform):

    def __init__(self, my_arg, my_side_input):
        super().__init__()
        self.my_arg= my_arg
        self.my_side_input= my_side_input

    @staticmethod
    def transform(
        element: Dict[str, Any], my_arg: int, my_side_input: Iterable[int]
    ) -> Dict[str, Any]:
        pass

    def expand(self, pcoll):
        return pcoll | "MyCompositeTransform" >> beam.Map(
            MyCompositeTransform.transform,
            self.my_arg,
            beam.pvalue.AsIter(self.my_side_input),
        )

Use beam.pvalue to define how the side input is passed to the transform e.g. is it a single value, an Iterable or materialize as a List?

Additional examples from Beam: (see PTransform) https://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/stats.html

Fiddlehead answered 4/5, 2020 at 1:3 Comment(1)
Great answer, it should be accepted: it answers the question, it is straight to the point, it provides a clear example and it also provides useful side input (pun intended) on how to use beam.pvalue to consume the data correctly in the DoFn.Kial
M
0

In general you cannot dynamically pass additional parameters to transforms at runtime as you described. When you run the controller program that constructs the pipeline, the structure of the pipeline is serialized, sent, and then executed in parallel on a fleet of workers that don't have access to your controller program, they only get the structure and the actual code of your ParDos.

One way to dynamically parameterize the execution is to supply the extra data as extra inputs, e.g. create another PCollection populated with the values of the parameters, and then join it with the main PCollection. For example using side-inputs, or CoGroupByKey.

If you're looking at Cloud Dataflow, then you might consider using pipeline templates with ValueProviders, not sure if they are available in pyton or non-Dataflow runners though.

Myriagram answered 27/12, 2018 at 19:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.