Context
I'm working with a streaming pipeline which has a protobuf data source in pubsub. I wish to parse this protobuf into a python dict because the data sink requires the input to be a collection of dicts. I had developed a Protobuf Parser successfully by initializing the protobuf message in process
function of DoFn.
Why a Generic Protobuf Parser is Needed
However, I wanted to know, is it possible to make a generic ProtobufParser DoFn on Beam? A generic DoFn is useful from the engineering perspective to avoid re-implementing existing functions and enabling code reuse. In Java, I know that we're able to use generics hence implementing this generic ProtobufParser in Java is relatively easy. Since Python functions are first-class objects, I was thinking whether it's possible to pass a Protobuf schema class (not the message instance object) into a DoFn. I tried to do this, however I kept failing.
Successful Parser with Caveat: not Generalizable
Below is my current successful protobuf parser. The protobuf message is initialized inside process
function.
class ParsePubSubProtoToDict(beam.DoFn):
def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict
message = DataSchema()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
While it's good that the above Protobuf DoFn parser works, it isn't generalized to all protobuf schemas, hence this will result in needing to re-implement a new DoFn parser for a different protobuf schema.
My Attempts
To make the parser generalizable for all protobuf schemas, I tried to pass the protobuf schema —which is generated as a class in Python— to DoFn.
class ParsePubSubProtoToDict(beam.DoFn):
def __init__(self, proto_class):
self.proto_class = proto_class
def process(self, element, *args, **kwargs):
from google.protobuf.json_format import MessageToDict
message = self.proto_class()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
def run_pubsub_to_gbq_pipeline(argv):
...
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
and other similar techniques, however, all my attempts fail with the same error message:
pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema
From this error message, I had two hypotheses on why the problem occurs:
Protobuf schema class is unserializable. However, this hypothesis probably wrong because while I'm aware
pickle
can't serialize the protobuf schema, if I useddill
, I was able to serialize the protobuf schema. But aside from this, I'm still a bit unsure about how DoFn in python beam implements serialization (eg: when it usesdill
orpickle
to serialize things, what's the serialized format of the object to make it Serializable and compatible with DoFn, etc.)Import error in DoFn class. I had encountered several Import Error problems with python beam due to function/class scope and dataflow workers, to solve this problem, I had to import the package locally in the function where it's needed, not globally in the module. So maybe, if we pass the protobuf schema class to DoFn, the schema import is actually done outside of DoFn, hence DoFn can't resolve the class name inside DoFn correctly?
My questions would be:
- Why does this error occur, and how can I resolve this error?
- Is it possible to pass a protobuf schema class? Or is there a better way for implementing a generic protobuf to python dict parser DoFn without passing a protobuf schema class to DoFn?
- How does DoFn in Python works, how do I ensure that the object that is passed to DoFn's creation (
__init__
) is serializable? Is there a Serializable class on beam in which I could inherit so that I could transform my unserializable objects to serializable?
Thanks a lot! Your help would be greatly appreciated.