Windowing with Apache Beam - Fixed Windows Don't Seem to be Closing?
Asked Answered
D

2

10

We are attempting to use fixed windows on an Apache Beam pipeline (using DirectRunner). Our flow is as follows:

  1. Pull data from pub/sub
  2. Deserialize JSON into Java object
  3. Window events w/ fixed windows of 5 seconds
  4. Using a custom CombineFn, combine each window of Events into a List<Event>
  5. For the sake of testing, simply output the resulting List<Event>

Pipeline code:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn())
                );

The result we are seeing is that the final step is never run, as we don't get any logging.

Also, we have added System.out.println("***") in each method of our custom CombineFn class, in order to track when these are run, and it seems they don't run either.

Is windowing set up incorrectly here? We followed an example found at https://beam.apache.org/documentation/programming-guide/#windowing and it seems fairly straightforward, but clearly there is something fundamental missing.

Any insight is appreciated - thanks in advance!

Deploy answered 16/5, 2017 at 21:23 Comment(2)
To answer your question, yes the windowing is set up correctly here. Make sure you have elements coming through Pub-Sub, by simply printing them on ParseEventFn(). The major thing is, did you use pipeline.run() to actually run the pipeline ? As you said nothing got printed.Stablish
Sorry, should have provided more context - I am already printing events in ParseEventFn and they are coming through - I am also using pipeline.run() to run the pipeline. I did add a trigger and this is causing the windows to emit events, which is good news - it looks like the windowing is based on event time (rather than processing time) and the windows don't know when to emit results w/o a trigger.Deploy
D
13

Looks like the main issue was indeed a missing trigger - the window was opening and there was nothing telling it when to emit results. We wanted to simply window based on processing time (not event time) and so did the following:

.apply("Window", Window
    .<Event>into(new GlobalWindows())
    .triggering(Repeatedly
        .forever(AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(5))
        )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

Essentially this creates a global window, which is triggered to emit events 5 seconds after the first element is processed. Every time the window is closed, another is opened once it receives an element. Beam complained when we didn't have the withAllowedLateness piece - as far as I know this just tells it to ignore any late data.

My understanding may be a bit off the mark here, but the above snippet has solved our problem!

Deploy answered 17/5, 2017 at 18:17 Comment(9)
What is the purpose of the 50 second windows? If the goal is just to emit data every N seconds, you should be able to use the GlobalWindow along with the processing time trigger you mentioned.Cluff
Thanks @BenChambers! That didn't seem right to me either - tried your suggestion and it works :) Will update my answerDeploy
Your windowing set up is still correct, just a different computation entirely. Do you care about events that occurred within a 5 second period? This is windowing. Do you care about getting incremental results every 5 seconds during processing of a global aggregation? This is a global window + triggering. The usual cause of your prior behavior is the watermark not advancing, which would be caused by the timestamps on the elements.Lamonicalamont
Can you confirm, why you had changed from Fixed to Global Window? Does the same solution of triggering, work also with the Fixed Window?Drexler
Would be incredibly useful if logs indicated a Window that would never close/fire..Wrist
@KennKnowles can you elaborate on "The usual cause of your prior behavior is the watermark not advancing, which would be caused by the timestamps on the elements."? I feel I'm seeing a very similar issue in a very similar pipeline attempting to use fixed windows reading from rabbitmq. What would cause the watermark to not advance from an unbounded input?Tatting
@Tatting could you post a question an link to it? then we can add to the SO knowledgebaseLamonicalamont
Thanks Kenn. I asked a related-but-not-quite-the-same question here: #58221872Tatting
Why the trigger by default does not emit the results? is the global window is infinit? thanks in advanceGeography
C
0

The underlying issue may not really be a missing global trigger. While that does force triggering and may appear to solve the problem, it is more likely the result of the PubSubIO not setting the watermark correctly. As recorded in this issue https://github.com/apache/beam/issues/19518 , the watermark often does not proceed at all when using the local runner. As such, your original 5 second window's default trigger would never cross/fire as Beam waits for progression of the watermark which does not happen.

Classicism answered 16/5, 2024 at 14:24 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.