- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.shiao</groupId>
- <artifactId>spark-01</artifactId>
- <version>1.0</version>
-
- <packaging>jar</packaging>
-
- <properties>
- <scala.version>2.11.8</scala.version>
- <hadoop.version>2.7.4</hadoop.version>
- <spark.version>2.0.2</spark.version>
- </properties>
-
- <dependencies>
- <!--scala依赖-->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <!--spark依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <!--hadoop依赖-->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.30</version>
- </dependency>
-
-
- <!--引入spark-streaming依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>2.0.2</version>
- </dependency>
-
- </dependencies>
-
-
-
-
- <!--配置插件-->
- <build>
- <plugins>
- <!--scala编译插件-->
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!--项目打包插件-->
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <archive>
- <manifest>
- <mainClass>WordCount</mainClass>
- </manifest>
- </archive>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- </plugin>
- </plugins>
-
- </build>
-
-
- </project>
- import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.{SparkConf, SparkContext}
- object SparkStreamingWordCount {
- def main(args: Array[String]): Unit = {
- //创建sparkContext
- val configStr = new SparkConf().setAppName("SparkStreamingWordCount").setMaster("local[2]")
- val sc = new SparkContext(configStr)
- //创建streamingContext
- val scc = new StreamingContext(sc, Seconds(5))
- //去掉多余的日志,影响观看
- sc.setLogLevel("WARN")
- //创建receive获取socket数据
- val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.52.110", 9999)
- //计数处理,以逗号划分,分成一个个字符串;对每个字符串进行处理成值为1的元组;对相同单词进行相加;进行打印
- val value: DStream[(String, Int)] = lines.flatMap(_.split("\\,")).map((_, 1)).reduceByKey(_ + _)
- value.print()
- //开启并阻塞线程,以保持不断获取
- scc.start()
- scc.awaitTermination()
- }
- }