经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
SparkStreaming wordCountDemo基础案例
来源:cnblogs  作者:强行快乐~  时间:2019/7/23 8:35:57  对本文有异议

 

体现sparkStreaming的秒级准实时性,所以我们需要一个能够持续输入数据的东东

1.CentOS上下载nc

创建一个scala工程,导入相关pom依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6.  
  7. <groupId>com.shiao</groupId>
  8. <artifactId>spark-01</artifactId>
  9. <version>1.0</version>
  10.  
  11. <packaging>jar</packaging>
  12.  
  13. <properties>
  14. <scala.version>2.11.8</scala.version>
  15. <hadoop.version>2.7.4</hadoop.version>
  16. <spark.version>2.0.2</spark.version>
  17. </properties>
  18.  
  19. <dependencies>
  20. <!--scala依赖-->
  21. <dependency>
  22. <groupId>org.scala-lang</groupId>
  23. <artifactId>scala-library</artifactId>
  24. <version>${scala.version}</version>
  25. </dependency>
  26. <!--spark依赖-->
  27. <dependency>
  28. <groupId>org.apache.spark</groupId>
  29. <artifactId>spark-core_2.11</artifactId>
  30. <version>${spark.version}</version>
  31. </dependency>
  32. <!--hadoop依赖-->
  33. <dependency>
  34. <groupId>org.apache.hadoop</groupId>
  35. <artifactId>hadoop-client</artifactId>
  36. <version>${hadoop.version}</version>
  37. </dependency>
  38.  
  39. <dependency>
  40. <groupId>mysql</groupId>
  41. <artifactId>mysql-connector-java</artifactId>
  42. <version>5.1.30</version>
  43. </dependency>
  44.  
  45.  
  46. <!--引入spark-streaming依赖-->
  47. <dependency>
  48. <groupId>org.apache.spark</groupId>
  49. <artifactId>spark-streaming_2.11</artifactId>
  50. <version>2.0.2</version>
  51. </dependency>
  52.  
  53. </dependencies>
  54.  
  55.  
  56.  
  57.  
  58. <!--配置插件-->
  59. <build>
  60. <plugins>
  61. <!--scala编译插件-->
  62. <plugin>
  63. <groupId>org.scala-tools</groupId>
  64. <artifactId>maven-scala-plugin</artifactId>
  65. <version>2.15.2</version>
  66. <executions>
  67. <execution>
  68. <goals>
  69. <goal>compile</goal>
  70. </goals>
  71. </execution>
  72. </executions>
  73. </plugin>
  74.  
  75. <!--项目打包插件-->
  76. <plugin>
  77. <artifactId>maven-assembly-plugin</artifactId>
  78. <configuration>
  79. <archive>
  80. <manifest>
  81. <mainClass>WordCount</mainClass>
  82. </manifest>
  83. </archive>
  84. <descriptorRefs>
  85. <descriptorRef>jar-with-dependencies</descriptorRef>
  86. </descriptorRefs>
  87. </configuration>
  88. </plugin>
  89. </plugins>
  90.  
  91. </build>
  92.  
  93. </project>

  创建一个object

编写代码

 

  1. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object SparkStreamingWordCount {
  5. def main(args: Array[String]): Unit = {
  6. //创建sparkContext
  7. val configStr = new SparkConf().setAppName("SparkStreamingWordCount").setMaster("local[2]")
  8. val sc = new SparkContext(configStr)
  9. //创建streamingContext
  10. val scc = new StreamingContext(sc, Seconds(5))
  11. //去掉多余的日志,影响观看
  12. sc.setLogLevel("WARN")
  13. //创建receive获取socket数据
  14. val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.52.110", 9999)
  15. //计数处理,以逗号划分,分成一个个字符串;对每个字符串进行处理成值为1的元组;对相同单词进行相加;进行打印
  16. val value: DStream[(String, Int)] = lines.flatMap(_.split("\\,")).map((_, 1)).reduceByKey(_ + _)
  17. value.print()
  18. //开启并阻塞线程,以保持不断获取
  19. scc.start()
  20. scc.awaitTermination()
  21. }
  22. }

跑起来

 

使用scoket nc打开9999端口发送数据

 测试

 

原文链接:http://www.cnblogs.com/BigDataBugKing/p/11227899.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号