Flink WaterMark And Triggers - Late elements not discarded on event time?
Asked Answered
M

2

6

I am somewhat confused by how Flink deals with late elements when watermarking on event time.

My understanding is that as Flink reads a stream of data, the watermark time is progressed upon seeing any data which has a larger event time than that of the current watermark. Then, any windows which cover a time strictly less than the watermark are triggered for eviction (assuming not late allowance.

However, take this minimal example:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}

object EventTimeExample {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  case class ExampleType(time: Long, value: Long)

  def main(args: Array[String]) {

    // Set up environment
    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // Example S3 path
    val simple = env.fromCollection(Seq(
      ExampleType(1525132800000L, 1),
      ExampleType(1525132800000L, 2) ,
      ExampleType(1525132920000L, 3),
      ExampleType(1525132800000L, 4)
    ))
      .assignAscendingTimestamps(_.time)

    val windows = simple
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
      .apply{
       (window, iter, collector: Collector[(Long, Long, String)]) => {
        collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
      }
    }

    windows.print
    env.execute("TimeStampExample")
  }
}

The result of running this is:

(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

However, if my understanding is correct, the 4 should not be included in the first window here, as the watermark time should be updated when the value 3 record is reached.

Now I recognise this is a trivial example, but not understanding this is making it hard to understand more complicated flows.

Malefactor answered 1/5, 2018 at 9:47 Comment(0)
G
9

Your understanding is basically correct, but there are a few more things going on here that need to be taken into account.

First of all, you've used assignAscendingTimestamps(), which can only be used when the event stream is perfectly in order (by timestamp), which isn't the case here. You should see this warning when you run this application:

WARN  org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor  - Timestamp monotony violated: 1525132800000 < 1525132920000

The other factor at work here is that an AscendingTimestampExtractor does not update the current Watermark for every passing stream element. This is an example of a periodic watermark generator, and it will inject a Watermark into the stream every n milliseconds, where n is defined by ExecutionConfig.setAutoWatermarkInterval(...), which defaults to 200 msec. This is how event #4 sneaks into the first window.

To get the results you expect, you could implement a punctuated watermark generator configured to generate a watermark for every event:

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[ExampleType] {
  override def extractTimestamp(element: ExampleType, previousElementTimestamp: Long): Long = {
    element.time
  }

  override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp)
  }
}

which you would then use like this:

val simple = env.fromCollection(Seq(
  ExampleType(1525132800000L, 1),
  ExampleType(1525132800000L, 2) ,
  ExampleType(1525132920000L, 3),
  ExampleType(1525132800000L, 4)
))
  .assignTimestampsAndWatermarks(new PunctuatedAssigner)

Now your example produces these results:

(1525132800000,1525132860000,List(1, 2))
(1525132920000,1525132980000,List(3))

Event #4 has been dropped because it is late. This could be adjusted by relaxing the watermark generator so as to accommodate some amount of out-of-orderness. E.g.,

override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
  new Watermark(extractedTimestamp - 200000)
}

which then produces these results:

(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

Or you could configure the windows to allow late events

val windows = simple
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
  .allowedLateness(Time.seconds(200))
  ...

which then causes the first window to fire twice:

(1525132800000,1525132860000,List(1, 2))
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

Note that since processing Watermarks imposes some overhead, you wouldn't normally want to use punctuated watermarks in this way (with a Watermark for every event). For most applications, periodic watermarks based on a BoundedOutOfOrdernessTimestampExtractor is a better choice.

Gilligan answered 1/5, 2018 at 13:18 Comment(5)
Wow thanks for your detailed answer. I don't see that warning, because I've actually turned warnings off at the start of the code I posted :-) However - if the event "4" really is late, does this warning apply? I''m still a little bit lost on the best way to handle the more complicated case I have, but I'll move that to the mailing list. Your explanation helps a lot.Malefactor
Yes, the warning applies. The ascending timestamps watermark generator is not intended to be used in cases where the timestamps aren't ascending.Gilligan
By "not intended" do you mean it will actually misbehave? I couldn't tell if this would actually cause the watermark to go back in time.Malefactor
The watermark will not go back in time; the implementation is protected against that. But I don't believe the exact behavior of how out-of-order events will be processed (or not) is specified, or guaranteed to remain the same.Gilligan
Hmm okay, I'm a bit confused by this. From what I could see in the source code it just takes the timestamp and extracts it, but I'll keep digging to understand.Malefactor
A
0

IF BoundedOutOfOrdernessTimestampExtractor is used the last calculation is not outputted till a new event comes. If we use SystemTime in the watermark it works but when you re-run for messages with embedded timestamp (past events) it does not calculate for those.

Agape answered 30/9, 2020 at 13:5 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.