经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
SparkStreaming两种方式连接Flume
来源:cnblogs  作者:zhuzhu&you  时间:2021/4/12 9:35:55  对本文有异议

SparkStreaming 连接Flume的两种方式分别为:Push(推)和Pull(拉)的方式实现,以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送数据);而Pull属于拉取(Spark 拉取 Flume的输出数据);

 Flume向SparkStreaming推送数据没有研究明白,有大佬指点一下吗?

万分感谢!

1.Spark拉取Flume数据:

导入两个jar包到flume/lib下

 否则抛出这两个异常:

org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink

java.lang.IllegalStateException: begin() called when transaction is OPEN!

2.编写flume 工作文件:

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. # source
  5. a1.sources.r1.type=spooldir
  6. a1.sources.r1.spoolDir=/home/zhuzhu/apps/flumeSpooding
  7. a1.sources.r1.fileHeader=true
  8.  
  9. # Describe the sink
  10. a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
  11. # 当前主机端口
  12. a1.sinks.k1.hostname = 192.168.137.88
  13. a1.sinks.k1.port = 9999
  14.  
  15. # Use a channel which buffers events in memory
  16. a1.channels.c1.type = memory
  17. a1.channels.c1.capacity = 1000
  18. a1.channels.c1.transactionCapacity = 100
  19.  
  20. # Bind the source and sink to the channel
  21. a1.sources.r1.channels = c1
  22. a1.sinks.k1.channel = c1

3.编写SparkStreaming程序:

  1. package day02
  2.  
  3. import java.net.InetSocketAddress
  4.  
  5. import org.apache.spark.storage.StorageLevel
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}
  7. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  8. import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
  9. import org.apache.spark.{SparkConf, SparkContext}
  10.  
  11. /**
  12. * @ClassName: StreamingFlume
  13. * @Description TODO 实时监控flume,统计flume数据产生,是Spark
  14. * @Author: Charon
  15. * @Date: 2021/4/7 13:19
  16. * @Version 1.0
  17. **/
  18. object StreamingFlume {
  19.  
  20. def main(args: Array[String]): Unit = {
  21. //1.创建SparkConf对象
  22. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingFlume")
  23. //2.创建SparkContext对象
  24. val sc = new SparkContext(conf)
  25. //设置日志输出格式,只打印异常日志,在这里设置没有用
  26. //sc.setLogLevel("WARN")
  27. //3.创建StreamingContext,Seconds(5):轮询机制,多久执行一次
  28. val ssc = new StreamingContext(sc, Seconds(5))
  29. //4.定义一个flume集合,可以接受多个flume数据,多个用,隔开需要new
  30. val addresses = Seq(new InetSocketAddress("127.0.0.1", 5555))
  31. //5.获取flume中的数据,
  32. val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK_2)
  33. // 6.截取flume数据:{"header":xxxxx   "body":xxxxxx}
  34. val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))
  35. lineDstream.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).print()
  36. ssc.start()
  37. ssc.awaitTermination()
  38. }
  39. }

 4。开启flume监控文件,开启SparkStreaming程序:

向指定目录上传文件

 

 

 

原文链接:http://www.cnblogs.com/zhuzhu-you/p/14629308.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号