Side inputs vs normal constructor parameters in Apache Beam
Asked Answered
O

1

5

I have a general question on side inputs and broadcasting in the context of Apache Beam. Does any additional variables, lists, maps that are need for computation during processElement, need to be passed as side input? Is it ok if they are passed as normal constructor arguments for the DoFn ? For example, what if I have some fixed (not computed) values variables (constants, like start date, end date) that I want to make use of during the per element computation of processElement. Now, I can make singleton PCollectionViews out of each of those variables separately and pass them to the DoFn constructor as side input. However, instead of doing that, can I not just pass each of those constants as normal constructor arguments to the DoFn? Am I missing anything subtle here?

In terms of code, when should I do:

public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
  // these are singleton views
  private final PCollectionView<LocalDateTime> dateStartView;
  private final PCollectionView<LocalDateTime> dateEndView;

  public MyFilter(PCollectionView<LocalDateTime> dateStartView,
                       PCollectionView<LocalDateTime> dateEndView){

      this.dateStartView = dateStartView;
      this.dateEndView = dateEndView;
  }

  @ProcessElement
  public void processElement(ProcessContext c) throws Exception{
  // extract date values from the singleton views here and use them

As opposed to :

public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
  private final LocalDateTime dateStart;
  private final LocalDateTime dateEnd;

  public MyFilter(LocalDateTime dateStart,
                       LocalDateTime dateEnd){

    this.dateStart = dateStart;
    this.dateEnd = dateEnd;
  }

  @ProcessElement
  public void processElement(ProcessContext c) throws Exception{
  // use the passed in date values directly here

Notice that in these examples, startDate and endDate are fixed values and not the dynamic results of any previous computation of the pipeline.

Onaonager answered 15/7, 2019 at 21:21 Comment(0)
B
6

When you call something like pipeline.apply(ParDo.of(new MyFilter(...)) the DoFn gets instantiated in the main program that you use to start the pipeline. It then gets serialized and passed to the runner for execution. Runner then decides where to execute it, e.g. on a fleet of a 100 VMs each of which will receive its own copy of the code and serialized data. If the member variables are serializable and you don't mutate them at execution time, it should be fine (link, link), the DoFn will get deserialized on each node with all the fields populated, and will get executed as expected. However you don't control the number of instances or basically their lifecycle (to some extent), so mutate them at your own risk.

The benefit of PCollections and side inputs is that you are not limited to static values, so for couple of simple unmutable values you should be fine .

Broadsword answered 15/7, 2019 at 21:52 Comment(1)
Sorry, this doesn't help me understand the OP's question. I think you are saying that, in his case, it makes no difference whether the startDate and endDate are passed as side input singletons or directly as constructor arguments. Do I understand you correctly? If one method is preferred -- for static, pre-determined values -- which method is preferred?Selfrenunciation

© 2022 - 2024 — McMap. All rights reserved.