How can I implement an operator with Flink's DataStream API that sends an event when no data was received from a stream for a certain amount of time?
Apache Flink - Send event if no data was received for x minutes
Asked Answered
Such an operator can be implemented using a ProcessFunction
.
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);
input
// use keyBy to have keyed state.
// NullByteKeySelector will move all data to one task. You can also use other keys
.keyBy(new NullByteKeySelector())
// use process function with 60 seconds timeout
.process(new TimeOutFunction(60 * 1000));
The TimeOutFunction
is defined as follows. In this example it uses processing time.
public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {
// delay after which an alert flag is thrown
private final long timeOut;
// state to remember the last timer set
private transient ValueState<Long> lastTimer;
public TimeOutFunction(long timeOut) {
this.timeOut = timeOut;
}
@Override
public void open(Configuration conf) {
// setup timer state
ValueStateDescriptor<Long> lastTimerDesc =
new ValueStateDescriptor<Long>("lastTimer", Long.class);
lastTimer = getRuntimeContext().getState(lastTimerDesc);
}
@Override
public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
// get current time and compute timeout time
long currentTime = ctx.timerService().currentProcessingTime();
long timeoutTime = currentTime + timeOut;
// register timer for timeout time
ctx.timerService().registerProcessingTimeTimer(timeoutTime);
// remember timeout time
lastTimer.update(timeoutTime);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
// check if this was the last timer we registered
if (timestamp == lastTimer.value()) {
// it was, so no data was received afterwards.
// fire an alert.
out.collect(true);
}
}
}
A small tweek. This setup is fine if the stream receives data atleast once. Any way to detect, if the stream does not receive data at all. Not even once? –
Uri
Can I use your solution with Kafka consumer, including an
out.collect
in processElement
? (my full question is #58280577). At the moment my consumer don't stop and fetches infinitely.. –
Millrace In the above example since we register a ProcessingTimeTimer. Shall we specify TimeCharacteristic.EventTime/ProcessingTime, or it does not matter in this context. Any hint? –
Equable
You can always register processing time timers. If you need event time, you should enable it via the TimeCharacteristics –
Linzer
This does not answer the question. This only works if the process function receives an element at least once, as a timer can only be created in "processElement". –
Bushed
You could set up a time window with a custom trigger function. In the trigger function, every time the an event is received the "onEvent" method would set a processingTimeTrigger to "currentTime + desiredTimeDelay". Then when a new event comes, you delete the trigger that was previously set and make a new one. If an event doesn't come by the time the system time is the time on the processingTimeTrigger, it fires and the window would be processed. Even if no events came, the list of events that are going to be processed would just be empty.
Code possible? atleast snippet? –
Uri
I would go with @Fabian Hueske's answer. More strait forward for your purposes. –
Truss
© 2022 - 2024 — McMap. All rights reserved.