Beam's big power comes from it's advanced windowing capabilities, but it's also a bit confusing.
Having seen some oddities in local tests (I use rabbitmq for an input Source) where messages were not always getting ack
d, and fixed windows that were not always closing, I started digging around StackOverflow and the Beam code base.
It seems there are Source-specific concerns with when exactly watermarks are set:
- RabbitMQ watermark does not advance: Apache Beam : RabbitMqIO watermark doesn't advance
- PubSub watermark does not advance for low volumes: https://issues.apache.org/jira/browse/BEAM-7322
- SQS IO does not advance the watermark over a period of time of no new incoming messages - https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sqs/SqsIO.java#L44
(and others). Further, there seem to be independent notions of Checkpoint
s (CheckpointMark
s) as oppose to Watermarks
.
So I suppose this is a multi-part question:
- What code is responsible for moving the watermark? It seems to be some combination of the Source and the Runner, but I can't seem to actually find it to understand it better (or tweak it for our use cases). It is a particular issue for me as in periods of low volume the watermark never advances and messages are not
ack
d - I don't see much documentation around what a Checkpoint/Checkpoint mark is conceptually (the non-code Beam documentation doesn't discuss it). How does a CheckpointMark interact with a Watermark, if at all?