Apache Beam : RabbitMqIO watermark doesn't advance
Asked Answered
C

1

1

I need some help please. I'm trying to use Apache beam with RabbitMqIO source (version 2.11.0) and AfterWatermark.pastEndOfWindow trigger. It seems like the RabbitMqIO's watermark doesn't advance and remain the same. Because of this behavior, the AfterWatermark trigger doesn't work. When I use others triggers which doesn't take watermark in consideration, that works (eg: AfterProcessingTime, AfterPane) Below, my code, thanks :

public class Main {

private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

// Window declaration with trigger
public static Window<RabbitMqMessage> window() {
    return Window. <RabbitMqMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
            .triggering(AfterWatermark.pastEndOfWindow())
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes();
}

public static void main(String[] args) {
    SpringApplication.run(Main.class, args);

    // pipeline creation
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);

    // Using RabbitMqIO
    PCollection<RabbitMqMessage> messages = pipeline
            .apply(RabbitMqIO.read().withUri("amqp://guest:guest@localhost:5672").withQueue("test"));

    PCollection<RabbitMqMessage> windowedData = messages.apply("Windowing", window());

    windowedData.apply(Combine.globally(new MyCombine()).withoutDefaults());

    pipeline.run();
    }

}

class MyCombine implements SerializableFunction<Iterable<RabbitMqMessage>,   RabbitMqMessage> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyCombineKafka.class);

/**
 * 
 */
private static final long serialVersionUID = 6143898367853230506L;

@Override
public RabbitMqMessage apply(Iterable<RabbitMqMessage> input) {
    LOGGER.info("After trigger launched");
    return null;
}

}
Cassiecassil answered 17/4, 2019 at 22:4 Comment(2)
I've been having exactly the same issue (even on Beam 2.15.0). Was about to verify that this doesn't reproduce using kafka or pubsub. Have you filed a BEAM ticket for this?Kylynn
I created issues.apache.org/jira/browse/BEAM-8347 for thisKylynn
K
1

I spent a lot of time looking into this. After opening https://issues.apache.org/jira/browse/BEAM-8347 I left some notes in the ticket on what I think the problems are with the current implementation.

Re-stated here:

The documentation for UnboundedSource.getWatermark reads:

[watermark] can be approximate. If records are read that violate this guarantee, they will be considered late, which will affect how they will be processed. ...

However, this value should be as late as possible. Downstream windows may not be able to close until this watermark passes their end.

For example, a source may know that the records it reads will be in timestamp order. In this case, the watermark can be the timestamp of the last record read. For a source that does not have natural timestamps, timestamps can be set to the time of reading, in which case the watermark is the current clock time.

The implementation in UnboundedRabbitMqReader uses the oldest timestamp as the watermark, in violation of the above suggestion.

Further, the timestamp applied is delivery time, which should be monotonically increasing. We should reliably be able to increase the watermark on every message delivered, which mostly solves the issue.

Finally, we can make provisions for increasing the watermark even when no messages have come in. In the event where there are no new messages, it should be ok to advance the watermark following the approach taken in the kafka io TimestampPolicyFactory when the stream is 'caught up'. In this case, we would increment the watermark to, e.g., max(current watermark, NOW - 2 seconds) when we see no new messages, just to ensure windows/triggers can fire without requiring new data.


Unfortunately, it's difficult to make these slight modifications locally as the Rabbit implementations are closed to extension, and are mostly private or package-private.

Update: I've opened a PR upstream to address this. Changes here: https://github.com/apache/beam/pull/9820

Kylynn answered 16/10, 2019 at 21:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.