经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
SparkStreaming 整合kafka Demo
来源:cnblogs  作者:强行快乐~  时间:2019/7/24 8:42:25  对本文有异议

 

这里使用的是低级API,因为高级API非常不好用,需要繁琐的配置,也不够自动化,却和低级API的效果一样,所以这里以低级API做演示

你得有zookeeper和kafka

我这里是3台节点主机

架构图

与高级API的区别,简单并行(不需要创造多个输入流,它会自动并行读取kafka的数据),高效(不会像receiver数据被copy两次),一次性语义(缺点:无法使用zookeeper的监控工具)

 

1.创建maven工程

首先添加pom依赖,其它运行依赖请参考 sparkStreaming整合WordCount

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  4. <version>2.0.2</version>
  5. </dependency>

2.启动zookeeper集群

我把zookeeper集群弄成了个脚本,直接执行脚本启动所有zookeeper

 

启动成功

3.启动kafka集群

我这里是3台主机,三台都需要

进入目录

  1. cd /export/servers/kafka/bin/

启动

  1. kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties

 

成功

4.测试kafka

创建topic

  1. cd /export/servers/kafka_2.11-0.10.2.1
  1. bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic kafka_spark

通过生产者发送消息

  1. cd /export/servers/kafka_2.11-0.10.2.1
  1. bin/kafka-console-producer.sh --broker-list node01:9092 --topic kafka_spark

想发啥,发啥。此时通过创建AP接收生产者发送的数据

编写代码

  1. package SparkStreaming
  2. import kafka.serializer.StringDecoder
  3. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
  4. import org.apache.spark.streaming.kafka.KafkaUtils
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. import org.apache.spark.{SparkConf, SparkContext}
  7. object SparkStreamingKafka {
  8. def main(args: Array[String]): Unit = {
  9. // 1.创建SparkConf对象
  10. val conf: SparkConf = new SparkConf()
  11. .setAppName("SparkStreamingKafka_Direct")
  12. .setMaster("local[2]")
  13. // 2.创建SparkContext对象
  14. val sc: SparkContext = new SparkContext(conf)
  15. sc.setLogLevel("WARN")
  16. // 3.创建StreamingContext对象
  17. /**
  18. * 参数说明:
  19. * 参数一:SparkContext对象
  20. * 参数二:每个批次的间隔时间
  21. */
  22. val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
  23. //设置checkpoint目录
  24. ssc.checkpoint("./Kafka_Direct")
  25. // 4.通过KafkaUtils.createDirectStream对接kafka(采用是kafka低级api偏移量不受zk管理)
  26. // 4.1.配置kafka相关参数
  27. val kafkaParams=Map("metadata.broker.list"->"192.168.52.110:9092,192.168.52.120:9092,192.168.52.130:9092","group.id"->"kafka_Direct")
  28. // 4.2.定义topic
  29. val topics=Set("kafka_spark")
  30. val dstream: InputDStream[(String, String)] = KafkaUtils
  31. .createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
  32. // 5.获取topic中的数据
  33. val topicData: DStream[String] = dstream.map(_._2)
  34. // 6.切分每一行,每个单词计为1
  35. val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
  36. // 7.相同单词出现的次数累加
  37. val resultDS: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
  38. // 8.通过Output Operations操作打印数据
  39. resultDS.print()
  40. // 9.开启流式计算
  41. ssc.start()
  42. // 阻塞一直运行
  43. ssc.awaitTermination()
  44. }
  45. }

 

生产者生产数据

API接收控制台打印计算结果

 

原文链接:http://www.cnblogs.com/BigDataBugKing/p/11233729.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号