经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
实时计算框架:Spark集群搭建与入门案例
来源:cnblogs  作者:知了一笑  时间:2021/5/6 17:32:29  对本文有异议

一、Spark概述

1、Spark简介

Spark是专为大规模数据处理而设计的,基于内存快速通用,可扩展的集群计算引擎,实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流,运算速度相比于MapReduce得到了显著的提高。

2、运行结构

Driver

运行Spark的Applicaion中main()函数,会创建SparkContext,SparkContext负责和Cluster-Manager进行通信,并负责申请资源、任务分配和监控等。

ClusterManager

负责申请和管理在WorkerNode上运行应用所需的资源,可以高效地在一个计算节点到数千个计算节点之间伸缩计算,目前包括Spark原生的ClusterManager、ApacheMesos和HadoopYARN。

Executor

Application运行在WorkerNode上的一个进程,作为工作节点负责运行Task任务,并且负责将数据存在内存或者磁盘上,每个 Application都有各自独立的一批Executor,任务间相互独立。

二、环境部署

1、Scala环境

安装包管理

  1. [root@hop01 opt]# tar -zxvf scala-2.12.2.tgz
  2. [root@hop01 opt]# mv scala-2.12.2 scala2.12

配置变量

  1. [root@hop01 opt]# vim /etc/profile
  2. export SCALA_HOME=/opt/scala2.12
  3. export PATH=$PATH:$SCALA_HOME/bin
  4. [root@hop01 opt]# source /etc/profile

版本查看

  1. [root@hop01 opt]# scala -version

Scala环境需要部署在Spark运行的相关服务节点上。

2、Spark基础环境

安装包管理

  1. [root@hop01 opt]# tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz
  2. [root@hop01 opt]# mv spark-2.1.1-bin-hadoop2.7 spark2.1

配置变量

  1. [root@hop01 opt]# vim /etc/profile
  2. export SPARK_HOME=/opt/spark2.1
  3. export PATH=$PATH:$SPARK_HOME/bin
  4. [root@hop01 opt]# source /etc/profile

版本查看

  1. [root@hop01 opt]# spark-shell

3、Spark集群配置

服务节点

  1. [root@hop01 opt]# cd /opt/spark2.1/conf/
  2. [root@hop01 conf]# cp slaves.template slaves
  3. [root@hop01 conf]# vim slaves
  4. hop01
  5. hop02
  6. hop03

环境配置

  1. [root@hop01 conf]# cp spark-env.sh.template spark-env.sh
  2. [root@hop01 conf]# vim spark-env.sh
  3. export JAVA_HOME=/opt/jdk1.8
  4. export SCALA_HOME=/opt/scala2.12
  5. export SPARK_MASTER_IP=hop01
  6. export SPARK_LOCAL_IP=安装节点IP
  7. export SPARK_WORKER_MEMORY=1g
  8. export HADOOP_CONF_DIR=/opt/hadoop2.7/etc/hadoop

注意SPARK_LOCAL_IP的配置。

4、Spark启动

依赖Hadoop相关环境,所以要先启动。

  1. 启动:/opt/spark2.1/sbin/start-all.sh
  2. 停止:/opt/spark2.1/sbin/stop-all.sh

这里在主节点会启动两个进程:Master和Worker,其他节点只启动一个Worker进程。

5、访问Spark集群

默认端口是:8080。

  1. http://hop01:8080/

运行基础案例:

  1. [root@hop01 spark2.1]# cd /opt/spark2.1/
  2. [root@hop01 spark2.1]# bin/spark-submit --class org.apache.spark.examples.SparkPi --master local examples/jars/spark-examples_2.11-2.1.1.jar
  3. 运行结果:Pi is roughly 3.1455357276786384

三、开发案例

1、核心依赖

依赖Spark2.1.1版本:

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.11</artifactId>
  4. <version>2.1.1</version>
  5. </dependency>

引入Scala编译插件:

  1. <plugin>
  2. <groupId>net.alchim31.maven</groupId>
  3. <artifactId>scala-maven-plugin</artifactId>
  4. <version>3.2.2</version>
  5. <executions>
  6. <execution>
  7. <goals>
  8. <goal>compile</goal>
  9. <goal>testCompile</goal>
  10. </goals>
  11. </execution>
  12. </executions>
  13. </plugin>

2、案例代码开发

读取指定位置的文件,并输出文件内容单词统计结果。

  1. @RestController
  2. public class WordWeb implements Serializable {
  3. @GetMapping("/word/web")
  4. public String getWeb (){
  5. // 1、创建Spark的配置对象
  6. SparkConf sparkConf = new SparkConf().setAppName("LocalCount")
  7. .setMaster("local[*]");
  8. // 2、创建SparkContext对象
  9. JavaSparkContext sc = new JavaSparkContext(sparkConf);
  10. sc.setLogLevel("WARN");
  11. // 3、读取测试文件
  12. JavaRDD lineRdd = sc.textFile("/var/spark/test/word.txt");
  13. // 4、行内容进行切分
  14. JavaRDD wordsRdd = lineRdd.flatMap(new FlatMapFunction() {
  15. @Override
  16. public Iterator call(Object obj) throws Exception {
  17. String value = String.valueOf(obj);
  18. String[] words = value.split(",");
  19. return Arrays.asList(words).iterator();
  20. }
  21. });
  22. // 5、切分的单词进行标注
  23. JavaPairRDD wordAndOneRdd = wordsRdd.mapToPair(new PairFunction() {
  24. @Override
  25. public Tuple2 call(Object obj) throws Exception {
  26. //将单词进行标记:
  27. return new Tuple2(String.valueOf(obj), 1);
  28. }
  29. });
  30. // 6、统计单词出现次数
  31. JavaPairRDD wordAndCountRdd = wordAndOneRdd.reduceByKey(new Function2() {
  32. @Override
  33. public Object call(Object obj1, Object obj2) throws Exception {
  34. return Integer.parseInt(obj1.toString()) + Integer.parseInt(obj2.toString());
  35. }
  36. });
  37. // 7、排序
  38. JavaPairRDD sortedRdd = wordAndCountRdd.sortByKey();
  39. List<Tuple2> finalResult = sortedRdd.collect();
  40. // 8、结果打印
  41. for (Tuple2 tuple2 : finalResult) {
  42. System.out.println(tuple2._1 + " ===> " + tuple2._2);
  43. }
  44. // 9、保存统计结果
  45. sortedRdd.saveAsTextFile("/var/spark/output");
  46. sc.stop();
  47. return "success" ;
  48. }
  49. }

打包执行结果:

查看文件输出:

  1. [root@hop01 output]# vim /var/spark/output/part-00000

四、源代码地址

  1. GitHub·地址
  2. https://github.com/cicadasmile/big-data-parent
  3. GitEE·地址
  4. https://gitee.com/cicadasmile/big-data-parent

阅读标签

Java基础】【设计模式】【结构与算法】【Linux系统】【数据库

分布式架构】【微服务】【大数据组件】【SpringBoot进阶】【Spring&Boot基础

数据分析】【技术导图】【 职场

技术系列

OLAP引擎:Druid组件进行数据统计分析

OLAP引擎:Presto组件跨数据源分析

OLAP引擎:ClickHouse高性能列式查询

原文链接:http://www.cnblogs.com/cicada-smile/p/14703022.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号