本文只是以left_join作为举例,right_join,full_join 等是同理的,大家可以自行扩展
1. 实验思路
1.提供两个流
nameStream: 用户名称信息从 9999 端口获取
ageStream: 有用户年龄信息 从9998 端口获取
2. 用nameStream left join ageStream ,合并两个流中的信息,输出结果
2. 方法
1【简单】: 使用 coGroup 来进行left join.
2【复杂】:使用 nameStream join (nameStream.union(ageStream)) 后, 按id 分组,判断分组后记录的个数。
偶数时,代表同一个id,会有俩次匹配,一次是left*left, 一次是 left * right, 返回非自身匹配就可以,也就是 left*right;
奇数时,代表只有一次 left*left 直接返回
3.代码
公用对象:

- import java.text.SimpleDateFormat
- import java.util.Date
- import com.alibaba.fastjson.JSON
- import com.alibaba.fastjson.serializer.SerializerFeature
- import scala.beans.BeanProperty
- object Const {
- final val names = List("wang", "li", "zhao", "qian", "sun")
- final val ages = List(0, 10, 20, 30)
- final val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- }
- case class People(
- @BeanProperty var id: Int = 0,
- @BeanProperty var name: String = "",
- @BeanProperty var age: Int = 0,
- @BeanProperty var eventTime: Long = 0,
- @BeanProperty var eventTimeStr: String = ""
- ) {
- override def toString: String = {
- JSON.toJSONString(this, SerializerFeature.QuoteFieldNames)
- }
- }
- object People {
- def newPeopleByName(id: Int, name: String) = {
- val than = new People()
- val now = new Date()
- than.id = id
- than.name = name
- than.eventTime = now.getTime
- than.eventTimeStr = Const.dateFormat.format(now)
- than
- }
- def newPeopleByAge(id: Int, age: Int) = {
- val than = new People()
- val now = new Date()
- than.id = id
- than.age = age
- than.eventTime = now.getTime
- than.eventTimeStr = Const.dateFormat.format(now)
- than
- }
- }
People
stream 测试:
- import java.lang
- import org.apache.flink.api.common.functions.{CoGroupFunction, JoinFunction}
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.scala.function.RichWindowFunction
- import org.apache.flink.streaming.api.watermark.Watermark
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- import org.apache.flink.util.Collector
- object DataStreamCoGroup {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
- val WIN = 10 // s
- // id,name
- val nameStream = env
- // .addSource(new NameSource)
- .socketTextStream("localhost", 9999)
- .filter(_.nonEmpty)
- .map(x => {
- val arr = x.split(",")
- People.newPeopleByName(id = arr(0).toInt, name = arr(1))
- })
- .assignTimestampsAndWatermarks(
- new BoundedOutOfOrdernessTimestampExtractor[People](Time.seconds(10L)) {
- override def extractTimestamp(element: People): Long = element.eventTime
- })
- // id,age
- val ageStream = env
- // .addSource(new AgeSource)
- .socketTextStream("localhost", 9998)
- .filter(_.nonEmpty)
- .map(x => {
- val arr = x.split(",")
- People.newPeopleByAge(id = arr(0).toInt, age = arr(1).toInt)
- })
- .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[People] {
- val maxOutOfOrderness = 1000L // 最大无序数据到达的时间,用来生成水印,10s
- var currentMaxTimestamp: Long = _
- override def getCurrentWatermark: Watermark = {
- new Watermark(currentMaxTimestamp - maxOutOfOrderness)
- }
- override def extractTimestamp(element: People, previousElementTimestamp: Long): Long = {
- currentMaxTimestamp = Math.max(element.eventTime, currentMaxTimestamp)
- element.eventTime
- }
- })
- nameStream
- .map(("name:", _))
- .print()
- ageStream
- .map(("age:", _))
- .print()
- // coGroup
- nameStream
- .coGroup(ageStream)
- .where(_.id).equalTo(_.id)
- .window(TumblingEventTimeWindows.of(Time.seconds(WIN)))
- .apply(new CoGroupFunction[People, People, People] {
- override def coGroup(first: lang.Iterable[People], second: lang.Iterable[People], out: Collector[People]): Unit = {
- if (first.iterator().hasNext) {
- first.forEach(left => {
- var noElement = true
- if (second.iterator().hasNext) {
- second.forEach(rigth => {
- noElement = false
- val data = left.copy()
- data.age = rigth.age
- out.collect(data)
- })
- }
- if (noElement) out.collect(left)
- })
- }
- }
- })
- .map(("-------- coGroup left Join", _))
- .print()
- // -------------------------------------------
- // left join
- val nameStream2 = nameStream.map((_, "nameStream"))
- val ageStream2 = ageStream.map((_, "ageStream"))
- // nameStream2
- // .print()
- //
- // ageStream2
- // .print()
- nameStream2
- .join(nameStream2.union(ageStream2))
- .where(_._1.id)
- .equalTo(_._1.id)
- .window(TumblingEventTimeWindows.of(Time.seconds(WIN)))
- .apply(new JoinFunction[Tuple2[People, String], Tuple2[People, String], Tuple2[People, Boolean]] {
- override def join(first: (People, String), second: (People, String)): (People, Boolean) = {
- val data = first._1.copy()
- var same = true // 是否和自身连接
- if (second._2.equals("ageStream")) {
- same = false
- data.age = second._1.age
- }
- (data, same)
- }
- })
- .keyBy(_._1.id)
- .window(TumblingEventTimeWindows.of(Time.seconds(WIN)))
- .apply(new RichWindowFunction[Tuple2[People, Boolean], People, Int, TimeWindow] {
- override def apply(key: Int, window: TimeWindow, input: Iterable[(People, Boolean)], out: Collector[People]): Unit = {
- var len = 0
- var same: People = null
- var unSame: People = null
- for (i <- input.iterator) {
- len += 1
- if (i._2) same = i._1.copy() // a1,a1
- else unSame = i._1.copy() // a1,a2
- }
- if (len % 2 == 0) out.collect(unSame) // 偶数返回 a1,a2
- else out.collect(same) // 奇数返回 a1,a1
- }
- })
- .map(("-------- left Join ", _))
- .print()
- env.execute("DataStreamCoGroup")
- }
- }
4. 测试
1. name 9999: [0s,10s)内输入
- (name:,{"age":0,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
- (name:,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})
2.age 9998: [0s,10s)内输入(1,1),(3,3)
- (age:,{"age":1,"eventTime":1602951661454,"eventTimeStr":"2020-10-18 00:21:01","id":1,"name":""})
- (age:,{"age":2,"eventTime":1602951662457,"eventTimeStr":"2020-10-18 00:21:02","id":2,"name":""})
3. 10s 过去后,再等1s 的水印延迟时间, 分别在两个流中输入任意信息,主要是为了更新水印信息触发window 计算
- (name:,{"age":0,"eventTime":1602951682681,"eventTimeStr":"2020-10-18 00:21:22","id":4,"name":"4"})
- (age:,{"age":4,"eventTime":1602951688581,"eventTimeStr":"2020-10-18 00:21:28","id":4,"name":""})
4. 结果:
- (-------- coGroup left Join,{"age":1,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
- (-------- coGroup left Join,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})
- (-------- left Join ,{"age":1,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
- (-------- left Join ,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})
可以看到,两种方式的结果一致
完整的输出:
- (age:,{"age":1,"eventTime":1602951661454,"eventTimeStr":"2020-10-18 00:21:01","id":1,"name":""})
- (age:,{"age":2,"eventTime":1602951662457,"eventTimeStr":"2020-10-18 00:21:02","id":2,"name":""})
- (name:,{"age":0,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
- (name:,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})
- (name:,{"age":0,"eventTime":1602951682681,"eventTimeStr":"2020-10-18 00:21:22","id":4,"name":"4"})
- (age:,{"age":4,"eventTime":1602951688581,"eventTimeStr":"2020-10-18 00:21:28","id":4,"name":""})
- (-------- coGroup left Join,{"age":1,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
- (-------- coGroup left Join,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})
- (-------- left Join ,{"age":1,"eventTime":1602951665626,"eventTimeStr":"2020-10-18 00:21:05","id":1,"name":"1"})
- (-------- left Join ,{"age":0,"eventTime":1602951668775,"eventTimeStr":"2020-10-18 00:21:08","id":3,"name":"3"})