Apache Flink - Send event if no data was received for x minutes
Asked Answered
U

2

11

How can I implement an operator with Flink's DataStream API that sends an event when no data was received from a stream for a certain amount of time?

Uri answered 1/11, 2017 at 16:47 Comment(0)
L
17

Such an operator can be implemented using a ProcessFunction.

DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);

input
  // use keyBy to have keyed state. 
  // NullByteKeySelector will move all data to one task. You can also use other keys
  .keyBy(new NullByteKeySelector())
  // use process function with 60 seconds timeout
  .process(new TimeOutFunction(60 * 1000));

The TimeOutFunction is defined as follows. In this example it uses processing time.

public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {

  // delay after which an alert flag is thrown
  private final long timeOut;
  // state to remember the last timer set
  private transient ValueState<Long> lastTimer;

  public TimeOutFunction(long timeOut) {
    this.timeOut = timeOut;
  }

  @Override
  public void open(Configuration conf) {
    // setup timer state
    ValueStateDescriptor<Long> lastTimerDesc = 
      new ValueStateDescriptor<Long>("lastTimer", Long.class);
    lastTimer = getRuntimeContext().getState(lastTimerDesc);
  }

  @Override
  public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
    // get current time and compute timeout time
    long currentTime = ctx.timerService().currentProcessingTime();
    long timeoutTime = currentTime + timeOut;
    // register timer for timeout time
    ctx.timerService().registerProcessingTimeTimer(timeoutTime);
    // remember timeout time
    lastTimer.update(timeoutTime);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
    // check if this was the last timer we registered
    if (timestamp == lastTimer.value()) {
      // it was, so no data was received afterwards.
      // fire an alert.
      out.collect(true);
    }
  }
}
Linzer answered 2/11, 2017 at 9:33 Comment(5)
A small tweek. This setup is fine if the stream receives data atleast once. Any way to detect, if the stream does not receive data at all. Not even once?Uri
Can I use your solution with Kafka consumer, including an out.collect in processElement? (my full question is #58280577). At the moment my consumer don't stop and fetches infinitely..Millrace
In the above example since we register a ProcessingTimeTimer. Shall we specify TimeCharacteristic.EventTime/ProcessingTime, or it does not matter in this context. Any hint?Equable
You can always register processing time timers. If you need event time, you should enable it via the TimeCharacteristicsLinzer
This does not answer the question. This only works if the process function receives an element at least once, as a timer can only be created in "processElement".Bushed
T
1

You could set up a time window with a custom trigger function. In the trigger function, every time the an event is received the "onEvent" method would set a processingTimeTrigger to "currentTime + desiredTimeDelay". Then when a new event comes, you delete the trigger that was previously set and make a new one. If an event doesn't come by the time the system time is the time on the processingTimeTrigger, it fires and the window would be processed. Even if no events came, the list of events that are going to be processed would just be empty.

Truss answered 1/11, 2017 at 19:42 Comment(2)
Code possible? atleast snippet?Uri
I would go with @Fabian Hueske's answer. More strait forward for your purposes.Truss

© 2022 - 2024 — McMap. All rights reserved.