How to solve Duplicate values exception when I create PCollectionView<Map<String,String>>
Asked Answered
M

1

6

I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey

Is anything wrong with this snippet code?

If I use .discardingFiredPanes() instead, I will lose information in the last emit.

pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
  .apply(
      Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(
             AfterProcessingTime.pastFirstElementInPane()))
         .accumulatingFiredPanes())
  .apply(new ReadSlowChangingTable())
  .apply(Latest.perKey())
  .apply(View.asMap());

Example Input Trigger:

t1 : KV<k1,v1> KV< k2,v2>
t2 : KV<k1,v1>

accumulatingFiredPanes => expected result at t2 => KV(k1,v1), KV(k2,v2) but failed due to duplicated exception

discardingFiredPanes => expected result at t2 => KV(k1,v1) Success

Morelos answered 29/1, 2019 at 13:46 Comment(15)
In your ReadSlowChangingTable are you reading the entire table again, or just a diff?Veolaver
@RezaRokni Entire table!Morelos
You also should not need accumulatingFiredPanes afaik, will need to dig into that as well.Veolaver
Ok, just wrote a little bit of test code to try this out. Can you confirm that when you use discardingFiredPanes you dont see the error?Veolaver
I don't see any error when I use discardingFiredPanesMorelos
hmmm, I did it ! Example Input Trigger: t1 : KV<k1,v1> KV< k2,v2> t2 : KV<k1,v1> accumulatingFiredPanes => expected result at t2 => KV(k1,v1), KV(k2,v2) but failed due to duplicated exception discardingFiredPanes => expected result at t2 => KV(k1,v1) SuccessMorelos
Great! So as you re-read the table every time, this should now give you what you need for your side input correct? Also please keep in mind that this pattern is non-deterministic, the side-input will eventually update to the workers but there is no guarantee that it be at all workers at the same time for the same windows etc...Veolaver
@RezaRokni thx! two more questions here ... 1.I still can't understand why using accumulatingFiredPanes would cause Duplicate Value Exception. Doesn't it aggregate all past values ? 2. Want to clarify the meaning of non-deterministic. By using global window here, it means that it is possible for different workers using different version of PCollectionView<Map<String, String>> at the same time Thank you !Morelos
My assumption, which I should have checked :-) was that you are using this sideinput in another part of your pipeline which is in stream mode and not in a Global Window. Is that correct? For example you have a FixedWindow of 5 mins and you are using this SideInput in a DoFn within that branch of your pipeline.Veolaver
Actually in my main branch of pipeline, i don't assign any window (Default should be Global Window). The main branch just continuously pull events from a Pub/Sub subscription, and sideinput here is a slow-changing Map to help me to filter main-stream events.Morelos
Ok thanx. So the DoFn will use whatever results came from the last trigger that fired, which here is based on processing time. There is a note at the bottom of section 4.4.2 in this doc around sideinput and windows: beam.apache.org/documentation/programming-guide/#side-inputs I am also looking into why accumilatingMode + Latest is not working and discarding is. Will report back here once I dig some more, as a result of that you may need to move to use your own Map Object stored in a normal View. Either way will ping back once I get more info.Veolaver
@RezaRokni I notice that note too. Thanks againMorelos
Did some more digging, please see answer below. Hope its helpful! Happy Apache Beaming! :-)Veolaver
why do you need the Latest.perKey transform? you are already aggregating the KVs info a map, so when a new KV comes it will update its valueStephanus
i see, because of the accumulating. if you were discarding then i guess u shouldn't need it right?Stephanus
V
7

Specifically with regards to view.asMap and accumulating panes discussion in the comments:

If you would like to make use of the View.asMap side input (for example, when the source of the map elements is itself distributed – often because you are creating a side input from the output of a previous transform), there are some other factors that will need to be taken into consideration: View.asMap is itself an aggregation, it will inherit triggering and accumulate its input. In this specific pattern, setting the pipeline to accumulatingPanes mode before this transform will result in duplicate key errors even if a transform such as Latest.perKey is used before the View.asMap transform.

Given the read updates the whole map, then the use of View.asSingleton would I think be a better approach for this use case.

Some general notes around this pattern, which will hopefully be useful for others as well:

For this pattern we can use the GenerateSequence source transform to emit a value periodically for example once a day. Pass this value into a global window via a data-driven trigger that activates on each element. In a DoFn, use this process as a trigger to pull data from your bounded source Create your SideInput for use in downstream transforms.

It's important to note that because this pattern uses a global-window side input triggering on processing time, matching to elements being processed in event time will be nondeterministic. For example if we have a main pipeline which is windowed on event time, the version of the SideInput View that those windows will see will depend on the latest trigger that has fired in processing time rather than the event time.

Also important to note that in general the side input should be something that fits into memory.

Java (SDK 2.9.0):

In the sample below the side input is updated at very short intervals, this is so that effects can be easily seen. The expectation is that the side input is updating slowly, for example every few hours or once a day.

In the example code below we make use of a Map that we create in a DoFn which becomes the View.asSingleton, this is the recommended approach for this pattern.

The sample below illustrates the pattern, please note the View.asSingleton is rebuilt on every counter update.

public static void main(String[] args) {

 // Create pipeline
 PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
     .as(PipelineOptions.class);

 // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
 // Run in debug mode to see the output
 Pipeline p = Pipeline.create(options);

 // Create slowly updating sideinput

 PCollectionView<Map<String, String>> map = p
     .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))

     .apply(Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
         .discardingFiredPanes())

     .apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
       @ProcessElement public void process(@Element Long input,
           OutputReceiver<Map<String, String>> o) {
         // Do any external reads needed here...
         // We will make use of our dummy external service.
         // Every time this triggers, the complete map will be replaced with that read from 
         // the service.
         o.output(DummyExternalService.readDummyData());
       }

     })).apply(View.asSingleton());

 // ---- Consume slowly updating sideinput

 // GenerateSequence is only used here to generate dummy data for this illustration.
 // You would use your real source for example PubSubIO, KafkaIO etc...
 p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
     .apply(Sum.longsGlobally().withoutDefaults())
     .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {

       @ProcessElement public void process(ProcessContext c) {
         Map<String, String> keyMap = c.sideInput(map);
         c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

  LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));

       }
     }).withSideInputs(map));

 p.run();
}

public static class DummyExternalService {

 public static Map<String, String> readDummyData() {

   Map<String, String> map = new HashMap<>();
   Instant now = Instant.now();

   DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

   map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
   map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

   return map;

 }
}
Veolaver answered 11/2, 2019 at 3:33 Comment(1)
Thanks again @Reza Rokni ! it is very useful !Morelos

© 2022 - 2024 — McMap. All rights reserved.