I am using the Python SDK of Apache Beam.
I have a few transform steps and want to make them reuseable, which points me to write a custom composite transform like this:
class MyCompositeTransform(beam.PTransform):
def expand(self, pcoll, arg1, kwarg1=u'default'):
result = (pcoll
| 'Step 1' >> beam.Map(lambda f: SomeFn(f, arg1))
| 'Last step' >> beam.Map(lambda f: SomeOtherFn(f, kwarg1))
)
return result
What I want is to supply some extra parameters arg1
and kwarg1
which are needed by other transforms within. But I do not know if this is a valid way, nor how to use it in pipeline.
Can someone please point me the direction?