We are attempting to use fixed windows on an Apache Beam pipeline (using DirectRunner
). Our flow is as follows:
- Pull data from pub/sub
- Deserialize JSON into Java object
- Window events w/ fixed windows of 5 seconds
- Using a custom
CombineFn
, combine each window ofEvent
s into aList<Event>
- For the sake of testing, simply output the resulting
List<Event>
Pipeline code:
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn())
);
The result we are seeing is that the final step is never run, as we don't get any logging.
Also, we have added System.out.println("***")
in each method of our custom CombineFn
class, in order to track when these are run, and it seems they don't run either.
Is windowing set up incorrectly here? We followed an example found at https://beam.apache.org/documentation/programming-guide/#windowing and it seems fairly straightforward, but clearly there is something fundamental missing.
Any insight is appreciated - thanks in advance!
ParseEventFn()
. The major thing is, did you usepipeline.run()
to actually run the pipeline ? As you said nothing got printed. – StablishParseEventFn
and they are coming through - I am also usingpipeline.run()
to run the pipeline. I did add a trigger and this is causing the windows to emit events, which is good news - it looks like the windowing is based on event time (rather than processing time) and the windows don't know when to emit results w/o a trigger. – Deploy