How to publish to Pub/Sub from Dataflow in batch (efficiently)?
B

1

6

I want to publish messages to a Pub/Sub topic with some attributes thanks to Dataflow Job in batch mode.

My dataflow pipeline is write with python 3.8 and apache-beam 2.27.0

It works with the @Ankur solution here : https://mcmap.net/q/1735583/-why-does-custom-python-object-cannot-be-used-with-pardo-fn

But I think it could be more efficient with a shared Pub/Sub Client : https://mcmap.net/q/1735583/-why-does-custom-python-object-cannot-be-used-with-pardo-fn

However an error occurred:

return StockUnpickler.find_class(self, module, name) AttributeError: Can't get attribute 'PublishFn' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.8/site-packages/dataflow_worker/start.py'>

Questions:

  1. Would the shared publisher implementation improve beam pipeline performance?
  2. Is there another way to avoid pickling error on my shared publisher client ?

My Dataflow Pipeline :

import apache_beam as beam
from apache_beam.io.gcp import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from google.cloud.pubsub_v1 import PublisherClient

import json
import argparse
import re
import logging


class PubsubClient(PublisherClient):
    def __reduce__(self):
        return self.__class__, (self.batch_settings,)


# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
    def __init__(self):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
            max_bytes=1024,  # One kilobyte
            max_latency=1,  # One second
        )

        self.publisher = PubsubClient(batch_settings)
        super().__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(
            topic=element["topic"],
            data=json.dumps(element["data"]).encode("utf-8"),
            **element["attributes"],
        )

        return future.result()


def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--source_table_id",
        dest="source_table_id",
        default="",
        help="BigQuery source table <project>.<dataset>.<table> with columns (topic, attributes, data)",
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).

    pipeline_options = PipelineOptions(pipeline_args)
    # pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    bq_source_table = known_args.source_table_id
    bq_table_regex = r"^(?P<PROJECT_ID>[a-zA-Z0-9_-]*)[\.|\:](?P<DATASET_ID>[a-zA-Z0-9_]*)\.(?P<TABLE_ID>[a-zA-Z0-9_-]*)$"

    regex_match = re.search(bq_table_regex, bq_source_table)

    if not regex_match:
        raise ValueError(
            f"Bad BigQuery table id : `{bq_source_table}` please match {bq_table_regex}"
        )

    table_ref = bigquery.TableReference(
        projectId=regex_match.group("PROJECT_ID"),
        datasetId=regex_match.group("DATASET_ID"),
        tableId=regex_match.group("TABLE_ID"),
    )

    with beam.Pipeline(options=pipeline_options) as p:

        (
            p
            | "ReadFromBqTable" # 
            >> bigquery.ReadFromBigQuery(table=table_ref, use_json_exports=True) # Each row contains : topic / attributes / data
            | "PublishRowsToPubSub" >> beam.ParDo(PublishFn())
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

Bibliotaph answered 26/3, 2021 at 17:21 Comment(5)
Any reason of using your own Publisher in a ParDo, rather than the one from Beam? It's not recommended tl use it in a ParDo. Also, if you want to do it in a ParDo, I suggest you use the setup method.Olivine
I want to run this pipeline in batch mode. The PubsubIO from Beam works only in streaming.Bibliotaph
The ParDo seems to be recommended : beam.apache.org/documentation/io/developing-io-overview/#sinksBibliotaph
You are completely right, I wasn't aware the writes to PS were not available in Python Batch, sorry. They are available in Java, though (that's why I was confused). Given the pipeline does not look to require anything Python-specific, have you considered using Java?Olivine
+1 to all of Iñigo's points. To avoid the pickling error, you can create the client in a setup() function of your DoFn class. I don't think using a shared client will help (I don't know if the pubsub client is thread-safe either)Contrive
Y
8

After fussing with this a bit, I think I have an answer that works consistently and is, if not world-beatingly performant, at least tolerably usable:

import logging

import apache_beam as beam
from apache_beam.io.gcp.pubsub import PubsubMessage

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import (
    BatchSettings,
    LimitExceededBehavior,
    PublishFlowControl,
    PublisherOptions,
)


class PublishClient(PublisherClient):
    """
    You have to override __reduce__ to make PublisherClient pickleable 😡 😤 🤬

    Props to 'Ankur' and 'Benjamin' on SO for figuring this part out; god knows
    I would not have...
    """

    def __reduce__(self):
        return self.__class__, (self.batch_settings, self.publisher_options)


class PubsubWriter(beam.DoFn):
    """
    beam.io.gcp.pubsub does not yet support batch operations, so
    we do this the hard way.  it's not as performant as the native
    pubsubio but it does the job.
    """

    def __init__(self, topic: str):
        self.topic = topic
        self.window = beam.window.GlobalWindow()
        self.count = 0

    def setup(self):
        batch_settings = BatchSettings(
            max_bytes=1e6,  # 1MB
            # by default it is 10 ms, should be less than timeout used in future.result() to avoid timeout
            max_latency=1,
        )

        publisher_options = PublisherOptions(
            enable_message_ordering=False,
            # better to be slow than to drop messages during a recovery...
            flow_control=PublishFlowControl(limit_exceeded_behavior=LimitExceededBehavior.BLOCK),
        )

        self.publisher = PublishClient(batch_settings, publisher_options)

    def start_bundle(self):
        self.futures = []

    def process(self, element: PubsubMessage, window=beam.DoFn.WindowParam):
        self.window = window
        self.futures.append(
            self.publisher.publish(
                topic=self.topic,
                data=element.data,
                **element.attributes,
            )
        )

    def finish_bundle(self):
        """Iterate over the list of async publish results and block
        until all of them have either succeeded or timed out.  Yield
        a WindowedValue of the success/fail counts."""

        results = []
        self.count = self.count + len(self.futures)
        for fut in self.futures:
            try:
                # future.result() blocks until success or timeout;
                # we've set a max_latency of 60s upstairs in BatchSettings,
                # so we should never spend much time waiting here.
                results.append(fut.result(timeout=60))
            except Exception as ex:
                results.append(ex)

        res_count = {"success": 0}
        for res in results:
            if isinstance(res, str):
                res_count["success"] += 1
            else:
                # if it's not a string, it's an exception
                msg = str(res)
                if msg not in res_count:
                    res_count[msg] = 1
                else:
                    res_count[msg] += 1

        logging.info(f"Pubsub publish results: {res_count}")

        yield beam.utils.windowed_value.WindowedValue(
            value=res_count,
            timestamp=0,
            windows=[self.window],
        )

    def teardown(self):
        logging.info(f"Published {self.count} messages")

The trick is that if you call future.result() inside the process() method, you will block until that single message is successfully published, so instead collect a list of futures and then at the end of the bundle make sure they're all either published or definitively timed out. Some quick testing with one of our internal pipelines suggested that this approach can publish 1.6M messages in ~200s.

Yakutsk answered 29/5, 2021 at 19:41 Comment(1)
Found since upgrading from 2.35.0 to 2.39.0 , it now takes a very long time to process 100 messages. It then pauses on the next 100. Just beware that 2.39.0 seems to have an issue. But works with 2.35.0, so thank youBereniceberenson

© 2022 - 2024 — McMap. All rights reserved.