What does reshuffling, in the context of exactly-once processing in BigQuery sink, mean?
Asked Answered
B

2

5

I'm reading an article on exactly-once processing implemented by some Dataflow sources and sinks and I'm having troubles understanding the example on BigQuery sink. From the article

Generating a random UUID is a non-deterministic operation, so we must add a reshuffle before we insert into BigQuery. Once that is done, any retries by Cloud Dataflow will always use the same UUID that was shuffled. Duplicate attempts to insert into BigQuery will always have the same insert id, so BigQuery is able to filter them

// Apply a unique identifier to each record
c
 .apply(new DoFn<> {
  @ProcessElement
  public void processElement(ProcessContext context) {
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 }
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
   @ProcessElement
   public void processElement(ProcessContext context) {
     insertIntoBigQuery(context.element().record(), context.element.id());
   }
 });

What does reshuffle mean and how can it prevent generation of different UUID for the same insert on subsequent retries ?

Bamford answered 26/9, 2018 at 14:52 Comment(0)
D
5

Reshuffle groups the data in a different way. However, here it is used for its side-effects: checkpointing and deduplication.

Without reshuffle, if the same task generates UUID and inserts data to BigQuery, there is a risk the worker restarts and the new worker would generate a new UUID and sends different row to BigQuery, resulting in duplicate rows.

Reshuffle operation splits UUID generation and BigQuery insert into two steps, and inserts checkpointing and deduplication between them.

  1. First, UUID are generated and sent to reshuffle. If UUID generation worker is restarted, it is OK, as reshuffle deduplicates rows, eliminating data from failed / restarted workers.
  2. The generated UUIDs are checkpointed by the shuffle operation.
  3. BigQuery insert worker uses checkpointed UUIDs, so even if it is restarted - it sends exactly the same data to BigQuery.
  4. BigQuery deduplicates data using these UUIDs, so duplicates from restarted insert worker are eliminated in BigQuery.
Decadence answered 28/9, 2018 at 19:47 Comment(0)
D
2

I think the article provides a good explanation on why "reshuffle" helps moving from "at least once" to "exactly once":

Specifically, the window might attempt to fire with element e0, e1, e2, but the worker crashes before committing the window processing (but not before those elements are sent as a side effect). When the worker restarts the window will fire again, but now a late element e3 shows up. Since this element shows up before the window is committed, it’s not counted as late data, so the DoFn is called again with elements e0, e1, e2, e3. These are then sent to the side-effect operation. Idempotency does not help here, as different logical record sets were sent each time.

There are other ways non-determinism can be introduced. The standard way to address this risk is to rely on the fact that Cloud Dataflow currently guarantees that only one version of a DoFn's output can make it past a shuffle boundary.

You can also check Reshuffle's docs:

There's a note there about deprecating this class, so future implementations of BigQueryIO might differ.

Decided answered 27/9, 2018 at 5:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.