经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
SparkStreaming整合flume
来源:cnblogs  作者:强行快乐~  时间:2019/7/23 8:35:54  对本文有异议

SparkStreaming整合flume

在实际开发中push会丢数据,因为push是由flume将数据发给程序,程序出错,丢失数据。所以不会使用不做讲解,这里讲解poll,拉去flume的数据,保证数据不丢失。

1.首先你得有flume

比如你有:【如果没有请走这篇:搭建flume集群(待定)

这里使用的flume的版本是apache1.6 cdh公司集成

这里需要下载

(1).我这里是将spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下

 

  1. cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib

  (ps:我的flume安装目录,使用ftp工具上传上去,我使用的是finalShell支持ssh也支持ftp(需要的小伙伴,点我下载))

 

(2)修改flume/lib下的scala依赖包(保证版本一致)

我这里是将spark中jar安装路径的scala-library-2.11.8.jar替换掉flume下的scala-library-2.10.5.jar

 

删除scala-library-2.10.5.jar

  1. rm -rf /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib/scala-library-2.10.5.jar

复制scala-library-2.11.8.jar

  1. cp /export/servers/spark-2.0.2/jars/scala-library-2.11.8.jar /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib/

 

(3)编写flume-poll.conf文件

创建目录

  1. mkdir /export/data/flume

创建配置文件

  1. vim /export/logs/flume-poll.conf

 

编写配置,标注发绿光的地方需要注意更改为自己本机的(flume是基于配置执行任务)

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #source
  5. a1.sources.r1.channels = c1
  6. a1.sources.r1.type = spooldir
  7. a1.sources.r1.spoolDir = /export/data/flume
  8. a1.sources.r1.fileHeader = true
  9. #channel
  10. a1.channels.c1.type =memory
  11. a1.channels.c1.capacity = 20000
  12. a1.channels.c1.transactionCapacity=5000
  13. #sinks
  14. a1.sinks.k1.channel = c1
  15. a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
  16. a1.sinks.k1.hostname=192.168.52.110
  17. a1.sinks.k1.port = 8888
  18. a1.sinks.k1.batchSize= 2000 

底行模式wq保存退出

执行flume

  1. flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /export/logs/flume-poll.conf -Dflume.root.logger=INFO,console

在监视的/export/data/flume下放入文件                    (黄色对应的是之前创建的配置文件)

 

执行成功

 

 代表你flume配置没有问题,接下来开始编写代码

1.导入相关依赖

 

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming-flume_2.11</artifactId>
  4. <version>2.0.2</version>
  5. </dependency>

 

2.编码

  1. package SparkStreaming
  2. import SparkStreaming.DefinedFunctionAdds.updateFunc
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  6. import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
  7. object SparkStreamingFlume {
  8. def main(args: Array[String]): Unit = {
  9. //创建sparkContext
  10. val conf: SparkConf = new SparkConf().setAppName("DefinedFunctionAdds").setMaster("local[2]")
  11. val sc = new SparkContext(conf)
  12. //去除多余的log,提高可视率
  13. sc.setLogLevel("WARN")
  14. //创建streamingContext
  15. val scc = new StreamingContext(sc,Seconds(5))
  16. //设置备份
  17. scc.checkpoint("./flume")
  18. //receive(task)拉取数据
  19. val num1: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,"192.168.52.110",8888)
  20. //获取flume中的body
  21. val value: DStream[String] = num1.map(x=>new String(x.event.getBody.array()))
  22. //切分处理,并附上数值1
  23. val result: DStream[(String, Int)] = value.flatMap(_.split(" ")).map((_,1))
  24. //结果累加
  25. val result1: DStream[(String, Int)] = result.updateStateByKey(updateFunc)
  26. result1.print()
  27. //启动并阻塞
  28. scc.start()
  29. scc.awaitTermination()
  30. }
  31. def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = {
  32. val newValue: Int = currentValues.sum+historyValues.getOrElse(0)
  33. Some(newValue)
  34. }
  35. }

运行

加入新的文档到监控目录  结果

成功结束!

 

原文链接:http://www.cnblogs.com/BigDataBugKing/p/11228784.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号