I am somewhat confused by how Flink deals with late elements when watermarking on event time.
My understanding is that as Flink reads a stream of data, the watermark time is progressed upon seeing any data which has a larger event time than that of the current watermark. Then, any windows which cover a time strictly less than the watermark are triggered for eviction (assuming not late allowance.
However, take this minimal example:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}
object EventTimeExample {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
case class ExampleType(time: Long, value: Long)
def main(args: Array[String]) {
// Set up environment
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Example S3 path
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignAscendingTimestamps(_.time)
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply{
(window, iter, collector: Collector[(Long, Long, String)]) => {
collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
}
}
windows.print
env.execute("TimeStampExample")
}
}
The result of running this is:
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
However, if my understanding is correct, the 4
should not be included in the first window here, as the watermark time should be updated when the value 3
record is reached.
Now I recognise this is a trivial example, but not understanding this is making it hard to understand more complicated flows.