经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
flink 如何实现对watermark 的checkpoint,防止数据复写
来源:cnblogs  作者:feiquan  时间:2020/11/9 16:08:53  对本文有异议

fink slink 后的数据被复写了???

生产环境总会遇到各种各样的莫名其名的数据,一但考虑不周便是车毁人亡啊。


 

线上sink 流是es , es 的文档id 是自定义的 id+windowSatarTime

设window size = 10min , watermark 最大延迟时间是 10s,. 数据中的event time 是乱序到达的,数据最大延迟时间是 30min

watermark 生成函数

  1. assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
  2. val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
  3. var currentMaxTimestamp: Long = _
  4. val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")
  5. override def getCurrentWatermark: Watermark = {
  6. println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
  7. new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  8. }
  9. override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
  10. currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
  11. element.time
  12. }
  13. })

 

如果现在是10:15 分,当前win的窗口是 [10:10,10:20),意味着[09:40,09:50,10:00] 的统计值已经生成 。

此时,程序发生异常,并有checkpoint + resart 策略,那么重启后,watermark 会继续从断点处消费?window 是否还是[10:10,10:20)?

答案是不会,watermark 会从0开始增长,window 也会从新开始。

重启后,如果不幸第一条数据的eventtime 是 09:45:02 , 那么此时 watermark 是 09:45:00 , window 是 [09:40:09:50), 一段时间后数据再次会聚合生条es 记录文档 [id+09:40], sink 时之前的es 数据会被覆盖

测试:

  1. 2020-10-21 23:57:01.001 -------watermark: -2
  2. input:Goods(id=1,count=10,time=10) // 输入: 1,10,10
  3. ()
  4. 2020-10-21 23:57:01.001 -------watermark: 8
    ....
  5. 2020-10-21 23:57:04.004 -------watermark: 8
  6. // 输入: 0,0,0 触发异常,重启
  7. 2020-10-21 23:57:09.009 -------watermark: -2 // watermark 重新开始
    ....
  8. 2020-10-21 23:57:17.017 -------watermark: -2
  9. input:Goods(id=1,count=10,time=10)
  10. ()
  11. 2020-10-21 23:57:17.017 -------watermark: 8
    ...

解决:

这里的  currentMaxTimestamp 本质可以看做是 Operator State , 那么可以通过实现  CheckpointedFunction、ListCheckpointed 接口来保存这个state

修改后的water mark 函数

  1. .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
  2. val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
  3. var currentMaxTimestamp: Long = _
  4. override def getCurrentWatermark: Watermark = {
  5. println("watermark", currentMaxTimestamp - maxOutOfOrderness)
  6. new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  7. }
  8. override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
  9. currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
  10. element.time
  11. }
  12. override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] = {
  13. Collections.singletonList(currentMaxTimestamp)
  14. }
  15. override def restoreState(state: util.List[JavaLong]): Unit = {
  16. val stateMin = state.asScala.min
  17. if (stateMin > 0) currentMaxTimestamp = stateMin
  18. }
  19. })

测试:

  1. 2020-10-22 00:39:00.000 -------watermark: -2
  2. input:Goods(id=1,count=10,time=10) // 输入: 1,10,10
  3. ()
  4. 2020-10-22 00:39:00.000 -------watermark: 8
  5. ...
  6. 2020-10-22 00:39:03.003 -------watermark: 8
  7. input:Goods(id=0,count=0,time=0) // 输入: 0,0,0 触发异常,重启
  8. 2020-10-22 00:39:08.008 -------watermark: 8 // 从 checkpoints 中获取state
  9. ...
  10. 2020-10-22 00:39:23.023 -------watermark: 8
  11. input:Goods(id=1,count=20,time=20) // 输入: 1,20,20
  12. ()
  13. 2020-10-22 00:39:23.023 -------watermark: 18
  14. ....

完整测试程序

  1. import java.util.{Collections, Date}
  2. import java.util
  3. import scala.collection.JavaConverters._
  4. import java.lang.{Long => JavaLong}
  5. import java.text.SimpleDateFormat
  6. import java.util.concurrent.TimeUnit
  7. import org.apache.flink.api.common.restartstrategy.RestartStrategies
  8. import org.apache.flink.api.common.time.Time
  9. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  10. import org.apache.flink.api.scala._
  11. import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
  12. import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
  13. import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
  14. import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
  15. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
  16. import org.apache.flink.streaming.api.watermark.Watermark
  17. /**
  18. * CheckpointCount
  19. */
  20. object WatermarkCheckpoint {
  21. case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) {
  22. override def toString: String = s"Goods(id=$id,count=$count,time=$time)"
  23. }
  24. def main(args: Array[String]): Unit = {
  25. val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")
  26. val env = StreamExecutionEnvironment.getExecutionEnvironment
  27. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  28. env.enableCheckpointing(1000 * 10)
  29. env.getCheckpointConfig.setCheckpointTimeout(1000 * 60) // checkpoint 超时时间
  30. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000 * 5) // 两次 checkpoint 的最小间隔
  31. env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // checkpoint 模式
  32. env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) // checkpoint 并发数
  33. env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // cancel job 时持久化checkopint
  34. env.getCheckpointConfig.setFailOnCheckpointingErrors(false) // 当checkpoint 失败时不会导致任务失败终止
  35. // restart strategy
  36. env.setRestartStrategy(
  37. RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))
  38. )
  39. // state backend
  40. val file_rocksdb = "file:///tmp/state/rocksdb" // 需要提前建立路径
  41. env.setStateBackend(new RocksDBStateBackend(file_rocksdb, true))
  42. env.setParallelism(1)
  43. env.socketTextStream("localhost", 9999)
  44. .filter(_.nonEmpty)
  45. .map(x => {
  46. val arr = x.split(",")
  47. val g = Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) // id,count,time
  48. println(s"input:$g")
  49. g
  50. })
  51. // watermark 没有 checkpoint
  52. /*.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
  53. val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
  54. var currentMaxTimestamp: Long = _
  55. override def getCurrentWatermark: Watermark = {
  56. println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
  57. new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  58. }
  59. override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
  60. currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
  61. element.time
  62. }
  63. })*/
  64.  
  65. // watermark checkpoint
  66. .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
  67. val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
  68. var currentMaxTimestamp: Long = _
  69. override def getCurrentWatermark: Watermark = {
  70. println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
  71. new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  72. }
  73. override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
  74. currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
  75. element.time
  76. }
  77. override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] = {
  78. Collections.singletonList(currentMaxTimestamp)
  79. }
  80. override def restoreState(state: util.List[JavaLong]): Unit = {
  81. val stateMin = state.asScala.min
  82. if (stateMin > 0) currentMaxTimestamp = stateMin
  83. }
  84. })
  85. .map(x => {
  86. if (x.id == 0) throw new RuntimeException("id is 0")
  87. })
  88. .print()
  89. env.execute(this.getClass.getSimpleName)
  90. }
  91. }
完整测试代码

 

原文链接:http://www.cnblogs.com/feiquan/p/13853105.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号