external api call in apache beam dataflow
Asked Answered
T

2

5

I have an use case where, I read in the newline json elements stored in google cloud storage and start processing each json. While processing each json, I have to call an external API for doing de-duplication whether that json element was discovered previously. I'm doing a ParDo with a DoFn on each json.

I haven't seen any online tutorial saying how to call an external API endpoint from apache beam DoFn Dataflow.

I'm using JAVA SDK of Beam. Some of the tutorial I studied explained that using startBundle and FinishBundle but I'm not clear on how to use it

Twenty answered 17/11, 2019 at 17:28 Comment(2)
Is this a streaming pipeline or a batch pipeline?Roumell
It's a Batch pipelineTwenty
D
9

If you need to check duplicates in external storage for every JSON record, then you still can use DoFn for that. There are several annotations, like @Setup, @StartBundle, @FinishBundle, etc, that can be used to annotate methods in your DoFn.

For example, if you need to instantiate a client object to send requests to your external database, then you might want to do this in @Setup method (like POJO constructor) and then leverage this client object in your @ProcessElement method.

Let's consider a simple example:

static class MyDoFn extends DoFn<Record, Record> {

    static transient MyClient client;

    @Setup
    public void setup() {
        client = new MyClient("host");
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // process your records
        Record r = c.element();
        // check record ID for duplicates
        if (!client.isRecordExist(r.id()) {
            c.output(r);
        }
    }

    @Teardown
    public void teardown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }
}

Also, to avoid doing remote calls for every record, you can batch bundle records into internal buffer (Beam split input data into bundles) and check duplicates in batch mode (if your client support this). For this purpose, you might use @StartBundle and @FinishBundle annotated methods that will be called right before and after processing Beam bundle accordingly.

For more complicated examples, I'd recommend to take a look on a Sink implementations in different Beam IOs, like KinesisIO, for instance.

Demodena answered 21/11, 2019 at 14:35 Comment(0)
C
1

There is an example of calling external system in batches using a stateful DoFn in the following blog post: https://beam.apache.org/blog/2017/08/28/timely-processing.html, might be helpful.

Convince answered 20/11, 2019 at 9:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.