Why does custom Python object cannot be used with ParDo Fn?
Asked Answered
D

3

6

I'm currently new to using Apache Beam in Python with Dataflow runner. I'm interested in creating a batch pipeline that publishes to Google Cloud PubSub, I had tinkered with Beam Python APIs and found a solution. However, during my explorations, I encountered some interesting problems which made me curious.

1. The Successful Pipeline

Currently, my successful beam pipeline for publishing data in batch manner from GCS looks like this:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

2. The Unsuccessful Pipelines

Here, I attempted to make the publisher shared accross DoFn. I had attempted the following methods.

a. Initializing publisher in DoFn

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

b. Initializing Publisher outside DoFn, and pass the it to DoFn

class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data | 
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )

Both attempts for making the publisher shared across DoFn methods failed with the following error messages:

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__

and

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

My questions would be:

  1. Would the shared publisher implementation improve beam pipeline performance? If yes, then I would like to explore this solution.

  2. Why do the errors occur on my failing pipelines? Is it due to the initializing and passing custom class object to DoFn outside the process function? If it is due to that, how can I implement a pipeline such that I would able to reuse a custom object in DoFn?

Thank you, your help would be greatly appreciated.

Edit: the Solution

Okay, so Ankur has explained why my problem occurs and discussed how serialization is done on DoFn. Based on this knowledge, I now understand that there are two solutions for making custom object shared/reusable in DoFn:

  1. Make the custom object Serializable: this allows the object to be initialized/available during DoFn object creation (under __init__). This object must be serializable since it'll get serialized during pipeline submission in which the DoFn object will be created (which calls __init__). How you can achieve this is answered below in my answer. Also, I found out that this requirement is actually associated to Beam Documentation under [1][2].

  2. Initialize Non-serializable objects in DoFn's functions outside __init__ to avoid serialization since functions outside init aren't called during pipeline submission. How you can accomplish this is explained in Ankur's answer.

References:

[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

Dias answered 24/4, 2019 at 5:21 Comment(0)
F
5

PublisherClient can not be pickled correctly. More on pickling here. Initializing the PublisherClient in the process method avoids the pickling of PublisherClient.

If the intent is to reuse the PublisherClient, I would recommend initializing PublisherClient in the process method and storing it in self using following code.

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        if not hasattr(self, 'publish'):
            from google.cloud import pubsub_v1
            self.publisher = pubsub_v1.PublisherClient()
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()
Fidole answered 24/4, 2019 at 7:12 Comment(5)
Hi @Fidole thank you for the explanation. I see now that the problem is due to pickling. In python beam implementation, is everything pickled except the process function of DoFn?Dias
Not exactly, let me explain. PublishFn(...) create a new object of PublishFn and this object is pickled including all the attributes of this object. process method is just code and will be pickled like any other method for this object but the variable of this object can only be picked if they support pickling. Long answer short, if you had another method foo in PublishFn then that would have also been treated similar to process and would have got correctly pickled as did process.Fidole
Hi @Ankur, since process is also pickled, I had a follow-up question. publisher is a variable in my successful pipeline inside process (or a PublishFn's property in your solution), but why are we able to initialize this to a PublisherClient instance only in process but not in __init__? Is it because if the variable is unpickleable, it won't get pickled if we initialize it inside the process function (or any other methods in DoFn) but wasn't the case with __init__?Dias
__init__ code is executed at the object creation which is before pickling. Later this object is pickled. If PublisherClient is part of the object at the pipeline submission time then it is also pickled which is causing things to fail. process is not called during the pipeline submission and hence the client is not made a part of the object and hence not pickled.Fidole
Thanks a lot! I understand the problem better now. So based on what I understand, there are two solutions for making custom object shared/reusable in DoFn: 1) Make the custom object Serializable: this allow the object be initialized/available during DoFn object creation (__init__), this object will get pickled as well during pipeline submission 2) Initialize Non-serializable objects in DoFn's functions outside __init__ to avoid pickling since fns outside __init__ aren't called during pipeline submissionDias
D
1

Thanks to Ankur, I discovered that this problem is due to the pickling problem in python. I then attempted to isolate the problem by solving the problem of pickling PublisherClient first and found a solution in sharing PublisherClient across DoFn on Beam.

In python, we can pickle custom object with dill package, and I realized that this package is already used on Beam python implementation for pickling objects. So I tried to troubleshoot the problem, and discovered this error:

TypeError: no default __reduce__ due to non-trivial __cinit__

Then, I tried to fix this error, and my pipeline now works!

Below is the solution:

class PubsubClient(PublisherClient):
    def __reduce__(self):
        return self.__class__, (self.batch_settings,)

# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path

        from google.cloud import pubsub_v1
        batch_settings = pubsub_v1.types.BatchSettings(
            max_bytes=1024,  # One kilobyte
            max_latency=1,  # One second
        )

        self.publisher = PubsubClient(batch_settings=batch_settings)
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(topic=self.topic_path, data=element.encode("utf-8"))

        return future.result()

# ...the run_gcs_to_pubsub is the same as my successful pipeline

The solution works like this: First, I subclass from PublisherClient and implement the __reduce__ function myself. Note that because I only used batch_settings property to initialize my PublisherClient, this property is enough for my __reduce__ function. I then used this modified PublisherClient for my DoFn in __init__.

Hopefully, with this new solution, my pipeline would gain performance improvement.

Dias answered 24/4, 2019 at 15:47 Comment(0)
A
0

Thanks for your solution! But I think you do it using Monkey Patching, instead of create a sub-class, like below:

def my_reduce(self:PublisherClient):
    return self.__class__, (self.batch_settings,)

PublisherClient.__reduce__ = my_reduce

and then you keep using PublisherClient class, instead of creating a new class

# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path

        from google.cloud import pubsub_v1
        batch_settings = pubsub_v1.types.BatchSettings(
            max_bytes=1024,  # One kilobyte
            max_latency=1,  # One second
        )

        self.publisher = PublisherClient(batch_settings=batch_settings)
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(topic=self.topic_path, data=element.encode("utf-8"))

        return future.result()
Alligator answered 9/5, 2023 at 15:8 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.