这里使用的是低级API,因为高级API非常不好用,需要繁琐的配置,也不够自动化,却和低级API的效果一样,所以这里以低级API做演示
你得有zookeeper和kafka
我这里是3台节点主机
架构图

与高级API的区别,简单并行(不需要创造多个输入流,它会自动并行读取kafka的数据),高效(不会像receiver数据被copy两次),一次性语义(缺点:无法使用zookeeper的监控工具)
1.创建maven工程
首先添加pom依赖,其它运行依赖请参考 sparkStreaming整合WordCount
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
- <version>2.0.2</version>
- </dependency>
2.启动zookeeper集群
我把zookeeper集群弄成了个脚本,直接执行脚本启动所有zookeeper

启动成功
3.启动kafka集群
我这里是3台主机,三台都需要
进入目录
- cd /export/servers/kafka/bin/
启动
- kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties

成功
4.测试kafka
创建topic
- cd /export/servers/kafka_2.11-0.10.2.1
- bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic kafka_spark
通过生产者发送消息
- cd /export/servers/kafka_2.11-0.10.2.1
- bin/kafka-console-producer.sh --broker-list node01:9092 --topic kafka_spark
想发啥,发啥。此时通过创建AP接收生产者发送的数据

编写代码
- package SparkStreaming
- import kafka.serializer.StringDecoder
- import org.apache.spark.streaming.dstream.{DStream, InputDStream}
- import org.apache.spark.streaming.kafka.KafkaUtils
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.{SparkConf, SparkContext}
- object SparkStreamingKafka {
- def main(args: Array[String]): Unit = {
- // 1.创建SparkConf对象
- val conf: SparkConf = new SparkConf()
- .setAppName("SparkStreamingKafka_Direct")
- .setMaster("local[2]")
- // 2.创建SparkContext对象
- val sc: SparkContext = new SparkContext(conf)
- sc.setLogLevel("WARN")
- // 3.创建StreamingContext对象
- /**
- * 参数说明:
- * 参数一:SparkContext对象
- * 参数二:每个批次的间隔时间
- */
- val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
- //设置checkpoint目录
- ssc.checkpoint("./Kafka_Direct")
- // 4.通过KafkaUtils.createDirectStream对接kafka(采用是kafka低级api偏移量不受zk管理)
- // 4.1.配置kafka相关参数
- val kafkaParams=Map("metadata.broker.list"->"192.168.52.110:9092,192.168.52.120:9092,192.168.52.130:9092","group.id"->"kafka_Direct")
- // 4.2.定义topic
- val topics=Set("kafka_spark")
- val dstream: InputDStream[(String, String)] = KafkaUtils
- .createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
- // 5.获取topic中的数据
- val topicData: DStream[String] = dstream.map(_._2)
- // 6.切分每一行,每个单词计为1
- val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
- // 7.相同单词出现的次数累加
- val resultDS: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
- // 8.通过Output Operations操作打印数据
- resultDS.print()
- // 9.开启流式计算
- ssc.start()
- // 阻塞一直运行
- ssc.awaitTermination()
- }
- }
生产者生产数据

API接收控制台打印计算结果
