经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
Spark Core知识点复习-1
来源:cnblogs  作者:勇者无畏强者无敌  时间:2019/11/12 12:09:47  对本文有异议

Day1111

  1. Spark任务调度
  2. Spark几个重要组件
  3. Spark Core
  4. RDD的概念和特性
  5. 生成RDD的两种类型
  6. RDD算子的两种类型
  7. 算子练习
  8. 分区
  9. RDD的依赖关系
  10. DAG:有向无环图
  11. 任务提交
  12. 缓存
  13. checkPoint
  14. 自定义排序
  15. 自定义分区器
  16. 自定义累加器
  17. 广播变量
  18. Spark Shuffle过程
  19. Spark优化过程
  20. SparkSQL
  21. 集成Hive

一.Spark Core

1 Spark任务调度:

  1. |->:standalone
  2. |->:local
  3. |->:Yarn
  4. |->:Mesos

2 Spark几个重要的组件

  1. |->:Master:管理Worker,负责接收Driver发送的注册信息(任务信息)
  2. |->:Worker:负责本节点资源和任务的管理,启动Exector进程
  3. |->:Exector:负责计算任务
  4. |->:Driver:用来提交任务(SparkSubmit进程)

3 Spark Core: RDD的概念和特性

  1. 数据的描述
  2. 1):一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
  3. 2):一个计算每个分区的函数。SparkRDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
  4. 3):RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
  5. 4):一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-valueRDD,才会有Partitioner,非key-valueRDDParititioner的值是NonePartitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  6. 5):一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
  7. 基本特性:可分区,函数,依赖,分区器,就近原则
  8. RDD的弹性
  9. 1): 自动进行内存和磁盘数据存储的切换
  10. Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换
  11. 2): 基于血统的高效容错机制
  12. RDD进行转换和动作的时候,会形成RDDLineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。
  13. 3): Task如果失败会自动进行特定次数的重试
  14. RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。
  15. 4): Stage如果失败会自动进行特定次数的重试
  16. 如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。
  17. 5): CheckpointPersist可主动或被动触发
  18. RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。
  19. 6): 数据调度弹性
  20. Spark把这个JOB执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。
  21. 7): 数据分片的高度弹性
  22. 可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。
  23. RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets):它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。

4 生成RDD的两种类型

  1. 1:从集合中创建RDD
  2. val conf = new SparkConf().setAppName("Test").setMaster("local")
  3. val sc = new SparkContext(conf)
  4. //这两个方法都有第二参数是一个默认值2 分片数量(partition的数量)
  5. //scala集合通过makeRDD创建RDD,底层实现也是parallelize
  6. val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6))
  7. //scala集合通过parallelize创建RDD
  8. val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))
  9. 2:从外部存储创建RDD
  10. al rdd3 = sc.textFile("hdfs://hadoop01:8020/word.txt")

5 RDD算子的两种类型

  1. |->:transformation算子:转化成新RDD
  2. |->:Action算子:转化成非RDD

6 算子练习

  1. |->迭代类型算子:map,flatMap,mapPartitions,foreach,foreachPartition...
  2. |->shuffle类算子:
  3. |->byKey:groupBy,reduceByKey(不一定),groupByKey,sortBy,SortByKey...
  4. |->重分区算子:repartition(必然发生shuffle),colaesce(不一定,多分区变少分区不需要发生shuffle),partitionBy(发生shuffle),repartitionAndSortWithinPartitions
  5. |->join类算子:join(不一定),fullOuterJoi,leftOuterJoin,rightOuterJoin
  6. |->去重类算子:distinct,countApproxDistinct(返回去重的个数)
  7. |->聚合类算子:reduce,reduceByKey,aggregate,aggregateByKey,fold,foldByKey,combineByKey,combineByKey,countByKey,countByValue
  8. |->排序类算子:sortBy,sortByKey
  9. 优化:
  10. 1.map,mapPartition优化:一定要分数据量和对应的物力资源来确定到底使用哪个算子
  11. 数据量 | map | mapPartition
  12. | 每个元素 | 每个分区
  13. --------------------------------------
  14. 比较大 | | 优先选择
  15. 海量数据 | 优先选择 | 可能发生OOM
  16. 2.foreach,foreachPartition优化:需要考虑到持久化时能够承受的连接数
  17. 场景 | foreach | foreachPartition
  18. | 每个元素 | 每个分区
  19. ---------------------------------------------------------
  20. 连接数据库 | 每个元素对应一个连接 | 优先选择(一个分区对应一个连接)
  21. 海量数据 | 优先选择 | 可能发生OOM
  22. 3.groupByKey,reduceByKey:如果能用reduceByKey解决的需求就用reduceByKey
  23. 场景 | groupByKey | reduceByKey(局部聚合)
  24. ---------------------------------------------------------
  25. | | 优先选择
  26. 4.join+filter(过滤):为了避免join过程产生很大的数据集的情况,可以先filterjoin
  27. filter:过滤后再计算可能发生严重的数据倾斜,可以在过滤后先调整
  28. 5.序列化调优:
  29. :RDD在计算过程中,调用的算子和传入算子的函数都是在Executor端执行,除此之外都是在Driver端执行的
  1. class SearchFunction(val query: String) extends Serializable {
  2. //第一个方法是判断输入的字符串是否存在query 存在返回true,不存在返回false
  3. def isMatch(s: String): Boolean = {
  4. s.contains(query)
  5. }
  6. // 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this"
  7. def getMatchFunctionReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => this.isMatch(x))// 等价于:rdd.filter(isMatch)
  8. // 问题:"query"表示"this.query",因此我们要传递整个"this"
  9. def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => x.contains(this.query))
  10. // 安全:只把我们需要的字段拿出来放入局部变量中
  11. def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
  12. val _query = this.query
  13. rdd.filter(x => x.contains(_query))
  14. }
  15. }
  16. object SearchFunctions {
  17. def main(args: Array[String]): Unit = {
  18. val conf = new SparkConf().setAppName(SearchFunctions.getClass.getName).setMaster("local[2]")
  19. val sc = new SparkContext(conf)
  20. val rdd = sc.parallelize(List("hello java", "hello scala hello", "hello hello"))
  21. val sf = new SearchFunction("hello")
  22. sf.getMatchFunctionReference(rdd).foreach(println)
  23. sf.getMatchesFieldReference(rdd).foreach(println)
  24. sf.getMatchesNoReference(rdd).foreach(println)
  25. sc.stop()
  26. }
  27. }
  1. class Rules extends Serializable {
  2. val rulesMap = Map("xiaoli" -> 23, "xiaoming" -> 26)
  3. }
  1. object ObjectRules extends Serializable {
  2. val rulesMap = Map("jack" -> 27, "lucy" -> 22)
  3. }
  1. object SerializeTest_1 {
  2. def main(args: Array[String]): Unit = {
  3. val conf = SparkUtil.getSparkConf
  4. val sc = new SparkContext(conf)
  5. val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
  6. //map方法中的函数是在Executor的某个Task中执行的
  7. val res = lines.map(x => {
  8. val rules = new Rules
  9. val hostname = InetAddress.getLocalHost.getHostName
  10. val threadName = Thread.currentThread().getName
  11. (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
  12. })
  13. println(res.collect.toBuffer)
  14. /*
  15. ArrayBuffer(
  16. (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@5c3d762c),
  17. (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@736d5f3b),
  18. (localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@374cd5ba))
  19. */
  20. sc.stop()
  21. }
  22. }
  1. object SerializeTest_2 {
  2. def main(args: Array[String]): Unit = {
  3. val conf = SparkUtil.getSparkConf
  4. val sc = new SparkContext(conf)
  5. val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
  6. //该对象在Driver中创建
  7. val rules = new Rules
  8. //map方法中的函数是在Executor的某个Task中执行的
  9. val res = lines.map(x => {
  10. val hostname = InetAddress.getLocalHost.getHostName
  11. val threadName = Thread.currentThread().getName
  12. (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
  13. })
  14. println(res.collect.toBuffer)
  15. /*
  16. ArrayBuffer(
  17. (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@48158406),
  18. (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@a287af2),
  19. (localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@a287af2))
  20. */
  21. sc.stop()
  22. }
  23. }
  1. object SerializeTest_3 {
  2. def main(args: Array[String]): Unit = {
  3. val conf = SparkUtil.getSparkConf
  4. val sc = new SparkContext(conf)
  5. val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
  6. //该对象在Driver中创建单例对象
  7. val rules = ObjectRules
  8. //map方法中的函数是在Executor的某个Task中执行的
  9. val res = lines.map(x => {
  10. val hostname = InetAddress.getLocalHost.getHostName
  11. val threadName = Thread.currentThread().getName
  12. (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
  13. })
  14. println(res.collect.toBuffer)
  15. /*
  16. ArrayBuffer(
  17. (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
  18. (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
  19. (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593))
  20. */
  21. sc.stop()
  22. }
  23. }
  1. object SerializeTest_4 {
  2. def main(args: Array[String]): Unit = {
  3. val conf = SparkUtil.getSparkConf
  4. val sc = new SparkContext(conf)
  5. val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
  6. //该对象在Driver中创建单例对象
  7. //map方法中的函数是在Executor的某个Task中执行的
  8. val res = lines.map(x => {
  9. val hostname = InetAddress.getLocalHost.getHostName
  10. val threadName = Thread.currentThread().getName
  11. /*
  12. 不用在Driver端去创建对象,Rules不用实现序列化
  13. */
  14. (hostname, threadName, ObjectRules.rulesMap.getOrElse(x, 0), ObjectRules.toString)
  15. })
  16. println(res.collect.toBuffer)
  17. /*
  18. ArrayBuffer(
  19. (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
  20. (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
  21. (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6))
  22. */
  23. sc.stop()
  24. }
  25. }

7 分区

  1. textFile分片过程:由指定的cpu核数+指定的分区数+block块的大小+文件的个数,经过分片算法得到最终的分区数

8 RDD的依赖关系

  1. |->宽依赖:一对多 一个父RDD分区会被多个子RDD使用
  2. |->窄依赖:一对一,多对一
  3. |->为什么区分宽窄依赖:
  4. |->1:有宽窄依赖就可以进行相应的容错
  5. |->2:宽依赖决定了stage的划分的依据

9 DAG

  1. 为什么划分stage:主要是为了生成task,stage划分过程实际上就将rdd的依赖按照shuffle来分为一个到多个的范围,task执行过程根本不会跨stage
  2. task数量 = stage数量 * 分区数(注:前提是没有手动更改分区数)
  3. 如果手动更改分区数,该stagetask数据由最后的分区数决定的

原文链接:http://www.cnblogs.com/taoxuefengblogs/p/11840546.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号