-
window+watermark 来处理乱序数据
对于 TumblingEventTimeWindows
window 的元数据startTime,endTime
和程序启动时间无关,当你指定出 window.size 时, window的startTime,endTime
就分配好了
-
allowedLateness 来处理迟到的数据
相当于延迟了window 的生命周期, 【startTime,endTime) -> [startTime,endTime+ allowedLateness]
-
sideOutput 是最后的兜底策略, 当window 的生命周期结束后, 延迟的数据可以通过侧输出收集起来,自定义后续的处理流程
- 程序
- import java.util.Date
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
- import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
- import org.apache.flink.streaming.api.watermark.Watermark
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
- object LastElement {
- case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) {
- override def toString: String = s"Goods(id=$id,count=$count,time=$time)"
- }
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
- // 创建延迟数据 OutputTag, 标记为 late-data
- val lateOutputTag = OutputTag[Goods]("late-data")
- val stream = env
- .socketTextStream("localhost", 9999)
- .filter(_.nonEmpty)
- .map(x => {
- val arr = x.split(",")
- Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) // id,count,time
- })
- .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
- val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
- var currentMaxTimestamp: Long = _
- override def getCurrentWatermark: Watermark = {
- new Watermark(currentMaxTimestamp - maxOutOfOrderness)
- }
- override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
- currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
- element.time
- }
- })
- val streamFunc = stream
- .keyBy(_.id)
- .timeWindow(Time.milliseconds(10))
- .trigger(EventTimeTrigger.create())
- .allowedLateness(Time.milliseconds(3)) // 允许延时的最大时间
- .sideOutputLateData(lateOutputTag) // 对延时数据进行标记
- .reduce { (v1, v2) => Goods(v1.id, v1.count + v2.count, v2.time) }
- // lateOutputTag 从窗口结果中获取迟到数据局产生的统计结果
- val lateStream = streamFunc.getSideOutput(lateOutputTag)
- stream
- .print()
- streamFunc
- .map(("_________sum: ", _))
- .print()
- lateStream
- .map(("+++++++++++late: ", _))
- .print()
- env.execute(this.getClass.getSimpleName)
- }
- }
input:
- 1,1,0
- 1,1,9
- 1,2,10
- 1,1,5
- 1,2,11
- 1,1,8
- 1,2,13
- 1,1,2
- 1,2,17
- 1,1,3
- 1,3,20
- 1,3,21
output:
- Goods(id=1,count=1,time=0)
- Goods(id=1,count=1,time=9)
- Goods(id=1,count=2,time=10)
- Goods(id=1,count=1,time=5)
- Goods(id=1,count=2,time=11)
- (_________sum: ,Goods(id=1,count=3,time=5))
- Goods(id=1,count=1,time=8)
- (_________sum: ,Goods(id=1,count=4,time=8))
- Goods(id=1,count=2,time=13)
- Goods(id=1,count=1,time=2)
- (_________sum: ,Goods(id=1,count=5,time=2))
- Goods(id=1,count=2,time=17)
- Goods(id=1,count=1,time=3)
- (+++++++++++late: ,Goods(id=1,count=1,time=3))
- Goods(id=1,count=3,time=20)
- Goods(id=1,count=3,time=21)
- (_________sum: ,Goods(id=1,count=8,time=17))
分析:
- 1,1,0 // win1 start
- 1,1,9 // win1 end 注意此时win1 没有关闭
- 1,2,10 // win2 start
- 1,1,5 // win1 这一条数据属于win1无序的数据,此时 watermark=7 < win1.endTime=9.
- 1,2,11 // win2 && win1 触发计算,原因是 watermark=9 >= win1.endTime=9 && win1中有数据。如果没有 allowedLateness(3ms)的话此时就已经关闭 win1 了,但是有延时3ms 所以还没有关闭
- 1,1,8 // win1 由于有 allowedLateness(3ms),这一条数据属于win1无序的数据,并触发 update;而不是 win1的 sideOutput 数据
- 1,2,13 // win2 && win1 处于 close 边缘,win1 真正的生命周期从 [0,9+2) -> [0,9+2+3]
- 1,1,2 // win1 allowedLateness(3ms) 导致 update
- 1,2,17 // win2 && win1 close
- 1,1,3 // win1 此时win1 已经close, 这条数据属于win1 的 sideOutput
- 1,3,20 // win3 start
- 1,3,21 // win3 && win2 触发计算
- // 所以最后的结果:
- win1: 1,5,2 + sideOutPut: 1,1,3
- win2: 1,8,17
- win3: 1,6,21