Apache Beam in Dataflow Large Side Input
Asked Answered
F

1

13

This is most similar to this question.

I am creating a pipeline in Dataflow 2.x that takes streaming input from a Pubsub queue. Every single message that comes in needs to be streamed through a very large dataset that comes from Google BigQuery and have all the relevant values attached to it (based on a key) before being written to a database.

The trouble is that the mapping dataset from BigQuery is very large - any attempt to use it as a side input fails with the Dataflow runners throwing the error "java.lang.IllegalArgumentException: ByteString would be too long". I have attempted the following strategies:

1) Side input

  • As stated,the mapping data is (apparently) too large to do this. If I'm wrong here or there is a work-around for this, please let me know because this would be the simplest solution.

2) Key-Value pair mapping

  • In this strategy, I read the BigQuery data and Pubsub message data in the first part of the pipeline, then run each through ParDo transformations that change every value in the PCollections to KeyValue pairs. Then, I run a Merge.Flatten transform and a GroupByKey transform to attach the relevant mapping data to each message.
  • The trouble here is that streaming data requires windowing to be merged with other data, so I have to apply windowing to the large, bounded BigQuery data as well. It also requires that the windowing strategies are the same on both datasets. But no windowing strategy for the bounded data makes sense, and the few windowing attempts I've made simply send all the BQ data in a single window and then never send it again. It needs to be joined with every incoming pubsub message.

3) Calling BQ directly in a ParDo (DoFn)

  • This seemed like a good idea - have each worker declare a static instance of the map data. If it's not there, then call BigQuery directly to get it. Unfortunately this throws internal errors from BigQuery every time (as in the entire message just says "Internal error"). Filing a support ticket with Google resulted in them telling me that, essentially, "you can't do that".

It seems this task doesn't really fit the "embarrassingly parallelizable" model, so am I barking up the wrong tree here?

EDIT :

Even when using a high memory machine in dataflow and attempting to make the side input into a map view, I get the error java.lang.IllegalArgumentException: ByteString would be too long

Here is an example (psuedo) of the code I'm using:

    Pipeline pipeline = Pipeline.create(options);

    PCollectionView<Map<String, TableRow>> mapData = pipeline
            .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
            .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) 
            .apply(View.asMap());

    PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
            .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));

    messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            JSONObject data = new JSONObject(new String(c.element().getPayload()));
            String key = getKeyFromData(data);
            TableRow sideInputData = c.sideInput(mapData).get(key);
            if (sideInputData != null) {
                LOG.info("holyWowItWOrked");
                c.output(new TableRow());
            } else {
                LOG.info("noSideInputDataHere");
            }
        }
    }).withSideInputs(mapData));

The pipeline throws the exception and fails before logging anything from within the ParDo.

Stack trace:

java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
        com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
        com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Foxworth answered 27/11, 2017 at 19:9 Comment(13)
What kind of SideInput view are you using? Can you share an example of how you were using that?Bouffard
Have you considered using a Stateful ParDo? If you were processing in the global window, this would allow you to store the value from BigQuery in state, and use it to process each value that arrived from the other stream. You would need to use the same Merge.Flatten approach you mentioned since Stateful DoFn's only work with a single input collection.Bouffard
For you first comment @BenChambers the side input it is a large mapping table. Each row has a key string in it that may match the data in the incoming Pubsub message. The mapping dataset changes each week but currently is ~40 million rows (about 10 GB) and for the course of the week is completely static and unchanging. I'm looking at the stateful pardo documentation now and seeing if could be viable...Foxworth
For side inputs, are you using View.asSingleton, View.asMap, etc.? For example -- View.asSingleton will take a PCollection with a single element and make it visible to the ParDo. View.asMap will take a PCollection<KV<K, V>> and make it available as a Map<K, V>, but will only read the keys you need.Bouffard
I was using View.asIterable to go through each row to check for a matchFoxworth
Using View.asIterable means that for every element you need to read (potentially) all 10 GB. That explains some performance problems. Would it be possible to use View.asMap or View.asMultimap? This would require you associate each row with a lookup key, but then you'd be able to query those items without reading everything.Bouffard
Do you mean to say that I can take the 10GB static side input and only bring needed values into the side input data for each incoming pubsub message?Foxworth
If they are keyed, yes. You take a PCollection<K, V>, and use the View.asMap or View.asMultimap, and get back a PCollectionView<Map<K, V>> or PCollectionView<Map<K, Iterable<V>>. The side input is written out using an indexed format, so when you do context.sideInput(view).get(someKey) it only needs to read a subset of the entire side-input data.Bouffard
Sounds promising. I will try thatFoxworth
Putting the large side input into KeyValue pairs and making the view using View.asMap, even when using the n1-highmem-16 machines, still throws the java.lang.IllegalArgumentException: ByteString would be too long error. I will edit the question with details from this attempt.Foxworth
Can you share the stack trace from the IllegalArgumentException you are getting? I want to understand where the ByteString that would be too long is occurring.Bouffard
Any thoughts on this? I've had a support ticket open with Google and I sent an email to Apache Beam devs with no luckFoxworth
Any news about this? facing the same problemMalfeasance
C
9

Check out the section called "Pattern: Streaming mode large lookup tables" in Guide to common Cloud Dataflow use-case patterns, Part 2. It might be the only viable solution since your side input doesn't fit into memory.

Description:

A large (in GBs) lookup table must be accurate, and changes often or does not fit in memory.

Example:

You have point of sale information from a retailer and need to associate the name of the product item with the data record which contains the productID. There are hundreds of thousands of items stored in an external database that can change constantly. Also, all elements must be processed using the correct value.

Solution:

Use the "Calling external services for data enrichment" pattern but rather than calling a micro service, call a read-optimized NoSQL database (such as Cloud Datastore or Cloud Bigtable) directly.

For each value to be looked up, create a Key Value pair using the KV utility class. Do a GroupByKey to create batches of the same key type to make the call against the database. In the DoFn, make a call out to the database for that key and then apply the value to all values by walking through the iterable. Follow best practices with client instantiation as described in "Calling external services for data enrichment".

Other relevant patterns are described in Guide to common Cloud Dataflow use-case patterns, Part 1:

  • Pattern: Slowly-changing lookup cache
  • Pattern: Calling external services for data enrichment
Carbarn answered 24/12, 2018 at 19:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.