经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
flink 两个datastream实现left_join的两种方法
来源:cnblogs  作者:feiquan  时间:2020/11/9 16:08:56  对本文有异议

本文只是以left_join作为举例,right_join,full_join 等是同理的,大家可以自行扩展

 

1. 实验思路

  1.提供两个流

            nameStream: 用户名称信息从 9999 端口获取

            ageStream: 有用户年龄信息 从9998 端口获取

  2. 用nameStream left join  ageStream ,合并两个流中的信息,输出结果

 

2. 方法

     1【简单】: 使用 coGroup 来进行left join. 

     2【复杂】:使用 nameStream join (nameStream.union(ageStream)) 后, 按id 分组,判断分组后记录的个数。

                    偶数时,代表同一个id,会有俩次匹配,一次是left*left, 一次是 left * right, 返回非自身匹配就可以,也就是 left*right;

                    奇数时,代表只有一次 left*left 直接返回

 

3.代码

公用对象:

  1. import java.text.SimpleDateFormat
  2. import java.util.Date
  3. import com.alibaba.fastjson.JSON
  4. import com.alibaba.fastjson.serializer.SerializerFeature
  5. import scala.beans.BeanProperty
  6. object Const {
  7. final val names = List("wang", "li", "zhao", "qian", "sun")
  8. final val ages = List(0, 10, 20, 30)
  9. final val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  10. }
  11. case class People(
  12. @BeanProperty var id: Int = 0,
  13. @BeanProperty var name: String = "",
  14. @BeanProperty var age: Int = 0,
  15. @BeanProperty var eventTime: Long = 0,
  16. @BeanProperty var eventTimeStr: String = ""
  17. ) {
  18. override def toString: String = {
  19. JSON.toJSONString(this, SerializerFeature.QuoteFieldNames)
  20. }
  21. }
  22. object People {
  23. def newPeopleByName(id: Int, name: String) = {
  24. val than = new People()
  25. val now = new Date()
  26. than.id = id
  27. than.name = name
  28. than.eventTime = now.getTime
  29. than.eventTimeStr = Const.dateFormat.format(now)
  30. than
  31. }
  32. def newPeopleByAge(id: Int, age: Int) = {
  33. val than = new People()
  34. val now = new Date()
  35. than.id = id
  36. than.age = age
  37. than.eventTime = now.getTime
  38. than.eventTimeStr = Const.dateFormat.format(now)
  39. than
  40. }
  41. }
People

 

stream 测试:

  1. import java.lang
  2. import org.apache.flink.api.common.functions.{CoGroupFunction, JoinFunction}
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
  6. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  7. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  8. import org.apache.flink.streaming.api.scala.function.RichWindowFunction
  9. import org.apache.flink.streaming.api.watermark.Watermark
  10. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
  11. import org.apache.flink.streaming.api.windowing.time.Time
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  13. import org.apache.flink.util.Collector
  14. object DataStreamCoGroup {
  15. def main(args: Array[String]): Unit = {
  16. val env = StreamExecutionEnvironment.getExecutionEnvironment
  17. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  18. env.setParallelism(1)
  19. val WIN = 10 // s
  20. // id,name
  21. val nameStream = env
  22. // .addSource(new NameSource)
  23. .socketTextStream("localhost", 9999)
  24. .filter(_.nonEmpty)
  25. .map(x => {
  26. val arr = x.split(",")
  27. People.newPeopleByName(id = arr(0).toInt, name = arr(1))
  28. })
  29. .assignTimestampsAndWatermarks(
  30. new BoundedOutOfOrdernessTimestampExtractor[People](Time.seconds(10L)) {
  31. override def extractTimestamp(element: People): Long = element.eventTime
  32. })
  33. // id,age
  34. val ageStream = env
  35. // .addSource(new AgeSource)
  36. .socketTextStream("localhost", 9998)
  37. .filter(_.nonEmpty)
  38. .map(x => {
  39. val arr = x.split(",")
  40. People.newPeopleByAge(id = arr(0).toInt, age = arr(1).toInt)
  41. })
  42. .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[People] {
  43. val maxOutOfOrderness = 1000L // 最大无序数据到达的时间,用来生成水印,10s
  44. var currentMaxTimestamp: Long = _
  45. override def getCurrentWatermark: Watermark = {
  46. new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  47. }
  48. override def extractTimestamp(element: People, previousElementTimestamp: Long): Long = {
  49. currentMaxTimestamp = Math.max(element.eventTime, currentMaxTimestamp)
  50. element.eventTime
  51. }
  52. })
  53. nameStream
  54. .map(("name:", _))
  55. .print()
  56. ageStream
  57. .map(("age:", _))
  58. .print()
  59. // coGroup
  60. nameStream
  61. .coGroup(ageStream)
  62. .where(_.id).equalTo(_.id)
  63. .window(TumblingEventTimeWindows.of(Time.seconds(WIN)))
  64. .apply(new CoGroupFunction[People, People, People] {
  65. override def coGroup(first: lang.Iterable[People], second: lang.Iterable[People], out: Collector[People]): Unit = {
  66. if (first.iterator().hasNext) {
  67. first.forEach(left => {
  68. var noElement = true
  69. if (second.iterator().hasNext) {
  70. second.forEach(rigth => {
  71. noElement = false
  72. val data = left.copy()
  73. data.age = rigth.age
  74. out.collect(data)
  75. })
  76. }
  77. if (noElement) out.collect(left)
  78. })
  79. }
  80. }
  81. })
  82. .map(("-------- coGroup left Join", _))
  83. .print()
  84. // -------------------------------------------
  85. // left join
  86. val nameStream2 = nameStream.map((_, "nameStream"))
  87. val ageStream2 = ageStream.map((_, "ageStream"))
  88. // nameStream2
  89. // .print()
  90. //
  91. // ageStream2
  92. // .print()
  93. nameStream2
  94. .join(nameStream2.union(ageStream2))
  95. .where(_._1.id)
  96. .equalTo(_._1.id)
  97. .window(TumblingEventTimeWindows.of(Time.seconds(WIN)))
  98. .apply(new JoinFunction[Tuple2[People, String], Tuple2[People, String], Tuple2[People, Boolean]] {
  99. override def join(first: (People, String), second: (People, String)): (People, Boolean) = {
  100. val data = first._1.copy()
  101. var same = true // 是否和自身连接
  102. if (second._2.equals("ageStream")) {
  103. same = false
  104. data.age = second._1.age
  105. }
  106. (data, same)
  107. }
  108. })
  109. .keyBy(_._1.id)
  110. .window(TumblingEventTimeWindows.of(Time.seconds(WIN)))
  111. .apply(new RichWindowFunction[Tuple2[People, Boolean], People, Int, TimeWindow] {
  112. override def apply(key: Int, window: TimeWindow, input: Iterable[(People, Boolean)], out: Collector[People]): Unit = {
  113. var len = 0
  114. var same: People = null
  115. var unSame: People = null
  116. for (i <- input.iterator) {
  117. len += 1
  118. if (i._2) same = i._1.copy() // a1,a1
  119. else unSame = i._1.copy() // a1,a2
  120. }
  121. if (len % 2 == 0) out.collect(unSame) // 偶数返回 a1,a2
  122. else out.collect(same) // 奇数返回 a1,a1
  123. }
  124. })
  125. .map(("-------- left Join ", _))
  126. .print()
  127. env.execute("DataStreamCoGroup")
  128. }
  129. }

 

4. 测试

   1. name 9999: [0s,10s)内输入 

  1. (name:,{"age":0,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
  2. (name:,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})

 

   2.age 9998: [0s,10s)内输入(1,1),(3,3)

  1. (age:,{"age":1,"eventTime":1602951661454,"eventTimeStr":"2020-10-18 00:21:01","id":1,"name":""})
  2. (age:,{"age":2,"eventTime":1602951662457,"eventTimeStr":"2020-10-18 00:21:02","id":2,"name":""})

 

   3. 10s 过去后,再等1s 的水印延迟时间, 分别在两个流中输入任意信息,主要是为了更新水印信息触发window 计算

  1. (name:,{"age":0,"eventTime":1602951682681,"eventTimeStr":"2020-10-18 00:21:22","id":4,"name":"4"})
  2. (age:,{"age":4,"eventTime":1602951688581,"eventTimeStr":"2020-10-18 00:21:28","id":4,"name":""})

   4. 结果:

  1. (-------- coGroup left Join,{"age":1,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
  2. (-------- coGroup left Join,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})
  3. (-------- left Join ,{"age":1,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
  4. (-------- left Join ,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})

    可以看到,两种方式的结果一致

 

完整的输出:

  1. (age:,{"age":1,"eventTime":1602951661454,"eventTimeStr":"2020-10-18 00:21:01","id":1,"name":""})
  2. (age:,{"age":2,"eventTime":1602951662457,"eventTimeStr":"2020-10-18 00:21:02","id":2,"name":""})
  3. (name:,{"age":0,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
  4. (name:,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})
  5. (name:,{"age":0,"eventTime":1602951682681,"eventTimeStr":"2020-10-18 00:21:22","id":4,"name":"4"})
  6. (age:,{"age":4,"eventTime":1602951688581,"eventTimeStr":"2020-10-18 00:21:28","id":4,"name":""})
  7. (-------- coGroup left Join,{"age":1,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
  8. (-------- coGroup left Join,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})
  9. (-------- left Join ,{"age":1,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
  10. (-------- left Join ,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})

 

原文链接:http://www.cnblogs.com/feiquan/p/13833256.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号