what actually manages watermarks in beam?
Asked Answered
A

1

6

Beam's big power comes from it's advanced windowing capabilities, but it's also a bit confusing.

Having seen some oddities in local tests (I use rabbitmq for an input Source) where messages were not always getting ackd, and fixed windows that were not always closing, I started digging around StackOverflow and the Beam code base.

It seems there are Source-specific concerns with when exactly watermarks are set:

(and others). Further, there seem to be independent notions of Checkpoints (CheckpointMarks) as oppose to Watermarks.

So I suppose this is a multi-part question:

  1. What code is responsible for moving the watermark? It seems to be some combination of the Source and the Runner, but I can't seem to actually find it to understand it better (or tweak it for our use cases). It is a particular issue for me as in periods of low volume the watermark never advances and messages are not ackd
  2. I don't see much documentation around what a Checkpoint/Checkpoint mark is conceptually (the non-code Beam documentation doesn't discuss it). How does a CheckpointMark interact with a Watermark, if at all?
Allonym answered 3/10, 2019 at 14:32 Comment(0)
L
4
  1. Each PCollection has its own watermark. The watermark indicates how complete that particular PCollection is. The source is responsible for the watermark of the PCollection that it produces. The propagation of watermarks to downstream PCollections is automatic with no additional approximation; it can be roughly understood as "the minimum of input PCollections and buffered state". So in your case, it is RabbitMqIO to look at for watermark problems. I am not familiar with this particular IO connector, but a bug report or email to the user list would be good if you have not already done this.
  2. A checkpoint is a source-specific piece of data that allows it to resume reading without missed messages, as long as the checkpoint is durably persisted by the runner. Message ACK tends to happen in checkpoint finalization, since the runner calls this method when it is known that the message never needs to be re-read.
Lilly answered 3/10, 2019 at 17:52 Comment(3)
Ah, I see. It looks like RabbitMqIO's reader intentionally conflates the two concepts (the watermark returned is the checkpointmark's oldest timestamp). And I believe I now see that that 'bug' I want to report is that while, e.g., PubsubIO makes provisions for advancing the watermark during periods of inactivity, Rabbit's doesn't, and messages can simply 'get stuck' if no new ones come through. (By 'get stuck' I mean a fixed window will never fire 'on time' until a threshold of messages is seen.) I'll ticket this.Allonym
Turns out almost all of my confusion was due to some unexpected implementation choices in the default RabbitMqIO's UnboundedSource implementation. I've documented them in BEAM-8347 and in stackoverflow https://mcmap.net/q/1174731/-apache-beam-rabbitmqio-watermark-doesn-39-t-advanceAllonym
Nice investigation. Thanks!Lilly

© 2022 - 2024 — McMap. All rights reserved.