If you need to check duplicates in external storage for every JSON record, then you still can use DoFn
for that. There are several annotations, like @Setup
, @StartBundle
, @FinishBundle
, etc, that can be used to annotate methods in your DoFn
.
For example, if you need to instantiate a client object to send requests to your external database, then you might want to do this in @Setup
method (like POJO constructor) and then leverage this client object in your @ProcessElement
method.
Let's consider a simple example:
static class MyDoFn extends DoFn<Record, Record> {
static transient MyClient client;
@Setup
public void setup() {
client = new MyClient("host");
}
@ProcessElement
public void processElement(ProcessContext c) {
// process your records
Record r = c.element();
// check record ID for duplicates
if (!client.isRecordExist(r.id()) {
c.output(r);
}
}
@Teardown
public void teardown() {
if (client != null) {
client.close();
client = null;
}
}
}
Also, to avoid doing remote calls for every record, you can batch bundle records into internal buffer (Beam split input data into bundles) and check duplicates in batch mode (if your client support this). For this purpose, you might use @StartBundle
and @FinishBundle
annotated methods that will be called right before and after processing Beam bundle accordingly.
For more complicated examples, I'd recommend to take a look on a Sink implementations in different Beam IOs, like KinesisIO, for instance.