经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
flink 处理实时数据的三重保障
来源:cnblogs  作者:feiquan  时间:2020/11/9 16:08:54  对本文有异议

flink 处理实时数据的三重保障

  1. window+watermark 来处理乱序数据
    对于 TumblingEventTimeWindows window 的元数据startTime,endTime 和程序启动时间无关,当你指定出 window.size 时, window的startTime,endTime就分配好了

  2. allowedLateness 来处理迟到的数据
    相当于延迟了window 的生命周期, 【startTime,endTime) -> [startTime,endTime+ allowedLateness]

  3. sideOutput 是最后的兜底策略, 当window 的生命周期结束后, 延迟的数据可以通过侧输出收集起来,自定义后续的处理流程

测试

  1. 程序
  1. import java.util.Date
  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.streaming.api.TimeCharacteristic
  4. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
  5. import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
  6. import org.apache.flink.streaming.api.watermark.Watermark
  7. import org.apache.flink.streaming.api.windowing.time.Time
  8. import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
  9. object LastElement {
  10. case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) {
  11. override def toString: String = s"Goods(id=$id,count=$count,time=$time)"
  12. }
  13. def main(args: Array[String]): Unit = {
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  16. env.setParallelism(1)
  17. // 创建延迟数据 OutputTag, 标记为 late-data
  18. val lateOutputTag = OutputTag[Goods]("late-data")
  19. val stream = env
  20. .socketTextStream("localhost", 9999)
  21. .filter(_.nonEmpty)
  22. .map(x => {
  23. val arr = x.split(",")
  24. Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) // id,count,time
  25. })
  26. .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
  27. val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
  28. var currentMaxTimestamp: Long = _
  29. override def getCurrentWatermark: Watermark = {
  30. new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  31. }
  32. override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
  33. currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
  34. element.time
  35. }
  36. })
  37. val streamFunc = stream
  38. .keyBy(_.id)
  39. .timeWindow(Time.milliseconds(10))
  40. .trigger(EventTimeTrigger.create())
  41. .allowedLateness(Time.milliseconds(3)) // 允许延时的最大时间
  42. .sideOutputLateData(lateOutputTag) // 对延时数据进行标记
  43. .reduce { (v1, v2) => Goods(v1.id, v1.count + v2.count, v2.time) }
  44. // lateOutputTag 从窗口结果中获取迟到数据局产生的统计结果
  45. val lateStream = streamFunc.getSideOutput(lateOutputTag)
  46. stream
  47. .print()
  48. streamFunc
  49. .map(("_________sum: ", _))
  50. .print()
  51. lateStream
  52. .map(("+++++++++++late: ", _))
  53. .print()
  54. env.execute(this.getClass.getSimpleName)
  55. }
  56. }

input:

  1. 1,1,0
  2. 1,1,9
  3. 1,2,10
  4. 1,1,5
  5. 1,2,11
  6. 1,1,8
  7. 1,2,13
  8. 1,1,2
  9. 1,2,17
  10. 1,1,3
  11. 1,3,20
  12. 1,3,21

output:

  1. Goods(id=1,count=1,time=0)
  2. Goods(id=1,count=1,time=9)
  3. Goods(id=1,count=2,time=10)
  4. Goods(id=1,count=1,time=5)
  5. Goods(id=1,count=2,time=11)
  6. (_________sum: ,Goods(id=1,count=3,time=5))
  7. Goods(id=1,count=1,time=8)
  8. (_________sum: ,Goods(id=1,count=4,time=8))
  9. Goods(id=1,count=2,time=13)
  10. Goods(id=1,count=1,time=2)
  11. (_________sum: ,Goods(id=1,count=5,time=2))
  12. Goods(id=1,count=2,time=17)
  13. Goods(id=1,count=1,time=3)
  14. (+++++++++++late: ,Goods(id=1,count=1,time=3))
  15. Goods(id=1,count=3,time=20)
  16. Goods(id=1,count=3,time=21)
  17. (_________sum: ,Goods(id=1,count=8,time=17))

分析:

  1. 1,1,0 // win1 start
  2. 1,1,9 // win1 end 注意此时win1 没有关闭
  3. 1,2,10 // win2 start
  4. 1,1,5 // win1 这一条数据属于win1无序的数据,此时 watermark=7 < win1.endTime=9.
  5. 1,2,11 // win2 && win1 触发计算,原因是 watermark=9 >= win1.endTime=9 && win1中有数据。如果没有 allowedLateness(3ms)的话此时就已经关闭 win1 了,但是有延时3ms 所以还没有关闭
  6. 1,1,8 // win1 由于有 allowedLateness(3ms),这一条数据属于win1无序的数据,并触发 update;而不是 win1的 sideOutput 数据
  7. 1,2,13 // win2 && win1 处于 close 边缘,win1 真正的生命周期从 [0,9+2) -> [0,9+2+3]
  8. 1,1,2 // win1 allowedLateness(3ms) 导致 update
  9. 1,2,17 // win2 && win1 close
  10. 1,1,3 // win1 此时win1 已经close, 这条数据属于win1 的 sideOutput
  11. 1,3,20 // win3 start
  12. 1,3,21 // win3 && win2 触发计算
  13. // 所以最后的结果:
  14. win1: 1,5,2 + sideOutPut: 1,1,3
  15. win2: 1,8,17
  16. win3: 1,6,21

原文链接:http://www.cnblogs.com/feiquan/p/13842149.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号