课程表

Spark 基础

Spark RDDs

Spark Streaming

Spark SQL

GraphX编程指南

工具箱
速查手册

Spark 编程介绍

当前位置:免费教程 » 数据库/运维 » Spark

Spark 编程指南简体中文版

本书出处:http://endymecy.gitbooks.io/spark-programming-guide-zh-cn/content/

从这里开始

你能够从spark官方网站查看一些spark运行例子。另外,Spark的example目录包含几个Spark例子,你能够通过如下方式运行Java或者scala例子:

  1. ./bin/run-example SparkPi

为了优化你的项目, configurationtuning指南提高了最佳实践的信息。保证你保存在内存中的数据是有效的格式是非常重要的事情。为了给部署操作提高帮助,集群模式概述介绍了包含分布式操作和支持集群管理的组件。

最后,完整的API文档可以在后面链接scala,java,python中查看。

Copyright

本文翻译自Spark 官方文档


设置Spark

在本机设置和运行Spark非常简单。你只需要下载一个预构建的包,只要你安装了Java 6+和Python 2.6+,就可以在Windows、Mac OS X和Linux上运行Spark。确保java程序在PATH环境变量中,或者设置了JAVA_HOME环境变量。类似的,python也要在PATH中。

假设你已经安装了Java和Python:

  1. 访问Spark下载页
  2. 选择Spark最新发布版(本文写作时是1.2.0),一个预构建的Hadoop 2.4包,直接下载。

现在,如何继续依赖于你的操作系统,靠你自己去探索了。Windows用户可以在评论区对如何设置的提示进行评论。

一般,我的建议是按照下面的步骤(在POSIX操作系统上):

1.解压Spark

  1. ~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz

2.将解压目录移动到有效应用程序目录中(如Windows上的

  1. ~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0

3.创建指向该Spark版本的符号链接到<spark目录。这样你可以简单地下载新/旧版本的Spark,然后修改链接来管理Spark版本,而不用更改路径或环境变量。

  1. ~$ ln -s /srv/spark-1.2.0 /srv/spark

4.修改BASH配置,将Spark添加到PATH中,设置SPARK_HOME环境变量。这些小技巧在命令行上会帮到你。在Ubuntu上,只要编辑~/.bash_profile或~/.profile文件,将以下语句添加到文件中:

  1. export SPARK_HOME=/srv/sparkexport PATH=$SPARK_HOME/bin:$PATH

5.source这些配置(或者重启终端)之后,你就可以在本地运行一个pyspark解释器。执行pyspark命令,你会看到以下结果:

  1. ~$ pyspark
  2. Python 2.7.8 (default, Dec 2 2014, 12:45:58)
  3. [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
  4. Type "help", "copyright", "credits" or "license" for more information.
  5. Spark assembly has been built with Hive, including Datanucleus jars on classpath
  6. Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
  7. [… snip …]
  8. Welcome to
  9. ____ __
  10. / __/__ ___ _____/ /__
  11. _\ \/ _ \/ _ `/ __/ `_/
  12. /__ / .__/\_,_/_/ /_/\_\ version 1.2.0
  13. /_/
  14. Using Python version 2.7.8 (default, Dec 2 2014 12:45:58)
  15. SparkContext available as sc.
  16. >>>

现在Spark已经安装完毕,可以在本机以”单机模式“(standalone mode)使用。你可以在本机开发应用并提交Spark作业,这些作业将以多进程/多线程模式运行的,或者,配置该机器作为一个集群的客户端(不推荐这样做,因为在Spark作业中,驱动程序(driver)是个很重要的角色,并且应该与集群的其他部分处于相同网络)。可能除了开发,你在本机使用Spark做得最多的就是利用spark-ec2脚本来配置Amazon云上的一个EC2 Spark集群了。

简略Spark输出

Spark(和PySpark)的执行可以特别详细,很多INFO日志消息都会打印到屏幕。开发过程中,这些非常恼人,因为可能丢失Python栈跟踪或者print的输出。为了减少Spark输出 – 你可以设置$SPARK_HOME/conf下的log4j。首先,拷贝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”扩展名。

  1. ~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties

编辑新文件,用WARN替换代码中出现的INFO。你的log4j.properties文件类似:

  1. # Set everything to be logged to the console
  2. log4j.rootCategory=WARN, console
  3. log4j.appender.console=org.apache.log4j.ConsoleAppender
  4. log4j.appender.console.target=System.err
  5. log4j.appender.console.layout=org.apache.log4j.PatternLayout
  6. log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
  7. # Settings to quiet third party logs that are too verbose
  8. log4j.logger.org.eclipse.jetty=WARN
  9. log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
  10. log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
  11. log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN

现在运行PySpark,输出消息将会更简略!感谢推特的@genomegeek在一次District Data Labs的研讨会中指出这一点。

在Spark中使用IPython Notebook

当搜索有用的Spark小技巧时,我发现了一些文章提到在PySpark中配置IPython notebook。IPython notebook对数据科学家来说是个交互地呈现科学和理论工作的必备工具,它集成了文本和Python代码。对很多数据科学家,IPython notebook是他们的Python入门,并且使用非常广泛,所以我想值得在本文中提及。

这里的大部分说明都来改编自IPython notebook: 在PySpark中设置IPython。但是,我们将聚焦在本机以单机模式将IPtyon shell连接到PySpark,而不是在EC2集群。如果你想在一个集群上使用PySpark/IPython,查看并评论下文的说明吧!

  1. 1.为Spark创建一个iPython notebook配置
  1. ~$ ipython profile create spark
  2. [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py'
  3. [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py'
  4. [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'

记住配置文件的位置,替换下文各步骤相应的路径:

2.创建文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,并添加如下代码:

  1. import os
  2. import sys
  3. # Configure the environment
  4. if 'SPARK_HOME' not in os.environ:
  5. os.environ['SPARK_HOME'] = '/srv/spark'
  6. # Create a variable for our root path
  7. SPARK_HOME = os.environ['SPARK_HOME']
  8. # Add the PySpark/py4j to the Python Path
  9. sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
  10. sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

3.使用我们刚刚创建的配置来启动IPython notebook。

  1. ~$ ipython notebook --profile spark

4.在notebook中,你应该能看到我们刚刚创建的变量。

  1. print SPARK_HOME

5.在IPython notebook最上面,确保你添加了Spark context。

  1. from pyspark import SparkContext
  2. sc = SparkContext( 'local', 'pyspark')

6.使用IPython做个简单的计算来测试Spark context。

  1. def isprime(n):
  2. """
  3. check if integer n is a prime
  4. """
  5. # make sure n is a positive integer
  6. n = abs(int(n))
  7. # 0 and 1 are not primes
  8. if n < 2:
  9. return False
  10. # 2 is the only even prime number
  11. if n == 2:
  12. return True
  13. # all other even numbers are not primes
  14. if not n & 1:
  15. return False
  16. # range starts with 3 and only needs to go up the square root of n
  17. # for all odd numbers
  18. for x in range(3, int(n**0.5)+1, 2):
  19. if n % x == 0:
  20. return False
  21. return True
  22. # Create an RDD of numbers from 0 to 1,000,000
  23. nums = sc.parallelize(xrange(1000000))
  24. # Compute the number of primes in the RDD
  25. print nums.filter(isprime).count()

编辑提示:上面配置了一个使用PySpark直接调用IPython notebook的IPython context。但是,你也可以使用PySpark按以下方式直接启动一个notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark

哪个方法好用取决于你使用PySpark和IPython的具体情景。前一个允许你更容易地使用IPython notebook连接到一个集群,因此是我喜欢的方法。

在EC2上使用Spark

在讲授使用Hadoop进行分布式计算时,我发现很多可以通过在本地伪分布式节点(pseudo-distributed node)或以单节点模式(single-node mode)讲授。但是为了了解真正发生了什么,就需要一个集群。当数据变得庞大,这些书面讲授的技能和真实计算需求间经常出现隔膜。如果你肯在学习详细使用Spark上花钱,我建议你设置一个快速Spark集群做做实验。 包含5个slave(和1个master)每周大概使用10小时的集群每月大概需要$45.18。

完整的讨论可以在Spark文档中找到:在EC2上运行Spark在你决定购买EC2集群前一定要通读这篇文档!我列出了一些关键点:

  1. 通过AWS Console获取AWS EC2 key对(访问key和密钥key)。
  2. 将key对导出到你的环境中。在shell中敲出以下命令,或者将它们添加到配置中。
  1. export AWS_ACCESS_KEY_ID=myaccesskeyid
  2. export AWS_SECRET_ACCESS_KEY=mysecretaccesskey

注意不同的工具使用不同的环境名称,确保你用的是Spark脚本所使用的名称。

3.启动集群:

  1. ~$ cd $SPARK_HOME/ec2
  2. ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>

4.SSH到集群来运行Spark作业。

  1. ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>

5.销毁集群

  1. ec2$ ./spark-ec2 destroy &lt;cluster-name&gt;.

这些脚本会自动创建一个本地的HDFS集群来添加数据,copy-dir命令可以同步代码和数据到该集群。但是你最好使用S3来存储数据,创建使用s3://URI来加载数据的RDDs。

Spark是什么?

既然设置好了Spark,现在我们讨论下Spark是什么。Spark是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。如果你熟悉Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop使用HDFS来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。类似的,Spark拥有多种语言的函数式编程API,提供了除map和reduce之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。

本质上,RDD是种编程抽象,代表可以跨机器进行分割的只读对象集合。RDD可以从一个继承结构(lineage)重建(因此可以容错),通过并行操作访问,可以读写HDFS或S3这样的分布式存储,更重要的是,可以缓存到worker节点的内存中进行立即重用。由于RDD可以被缓存在内存中,Spark对迭代应用特别有效,因为这些应用中,数据是在整个算法运算过程中都可以被重用。大多数机器学习和最优化算法都是迭代的,使得Spark对数据科学来说是个非常有效的工具。另外,由于Spark非常快,可以通过类似Python REPL的命令行提示符交互式访问。

Spark库本身包含很多应用元素,这些元素可以用到大部分大数据应用中,其中包括对大数据进行类似SQL查询的支持,机器学习和图算法,甚至对实时流数据的支持。

核心组件如下:

  • Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
  • Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。对熟悉Hive和HiveQL的人,Spark可以拿来就用。
  • Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming允许程序能够像普通RDD一样处理实时数据。
  • MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。
  • GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。

由于这些组件满足了很多大数据需求,也满足了很多数据科学任务的算法和计算上的需要,Spark快速流行起来。不仅如此,Spark也提供了使用Scala、Java和Python编写的API;满足了不同团体的需求,允许更多数据科学家简便地采用Spark作为他们的大数据解决方案。

对Spark编程

编写Spark应用与之前实现在Hadoop上的其他数据流语言类似。代码写入一个惰性求值的驱动程序(driver program)中,通过一个动作(action),驱动代码被分发到集群上,由各个RDD分区上的worker来执行。然后结果会被发送回驱动程序进行聚合或编译。本质上,驱动程序创建一个或多个RDD,调用操作来转换RDD,然后调用动作处理被转换后的RDD。

这些步骤大体如下:

  1. 定义一个或多个RDD,可以通过获取存储在磁盘上的数据(HDFS,Cassandra,HBase,Local Disk),并行化内存中的某些集合,转换(transform)一个已存在的RDD,或者,缓存或保存。
  2. 通过传递一个闭包(函数)给RDD上的每个元素来调用RDD上的操作。Spark提供了除了Map和Reduce的80多种高级操作。
  3. 使用结果RDD的动作(action)(如count、collect、save等)。动作将会启动集群上的计算。

当Spark在一个worker上运行闭包时,闭包中用到的所有变量都会被拷贝到节点上,但是由闭包的局部作用域来维护。Spark提供了两种类型的共享变量,这些变量可以按照限定的方式被所有worker访问。广播变量会被分发给所有worker,但是是只读的。累加器这种变量,worker可以使用关联操作来“加”,通常用作计数器。

Spark应用本质上通过转换和动作来控制RDD。后续文章将会深入讨论,但是理解了这个就足以执行下面的例子了。

Spark的执行

简略描述下Spark的执行。本质上,Spark应用作为独立的进程运行,由驱动程序中的SparkContext协调。这个context将会连接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每个worker由执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。

重点要记住的是应用代码由驱动程序发送给执行者,执行者指定context和要运行的任务。执行者与驱动程序通信进行数据分享或者交互。驱动程序是Spark作业的主要参与者,因此需要与集群处于相同的网络。这与Hadoop代码不同,Hadoop中你可以在任意位置提交作业给JobTracker,JobTracker处理集群上的执行。

与Spark交互

使用Spark最简单的方式就是使用交互式命令行提示符。打开PySpark终端,在命令行中打出pyspark。

  1. ~$ pyspark
  2. [… snip …]
  3. >>>

PySpark将会自动使用本地Spark配置创建一个SparkContext。你可以通过sc变量来访问它。我们来创建第一个RDD。

  1. >>> text = sc.textFile("shakespeare.txt")
  2. >>> print text
  3. shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

textFile方法将莎士比亚全部作品加载到一个RDD命名文本。如果查看了RDD,你就可以看出它是个MappedRDD,文件路径是相对于当前工作目录的一个相对路径(记得传递磁盘上正确的shakespear.txt文件路径)。我们转换下这个RDD,来进行分布式计算的“hello world”:“字数统计”。

  1. >>> from operator import add
  2. >>> def tokenize(text):
  3. ... return text.split()
  4. ...
  5. >>> words = text.flatMap(tokenize)
  6. >>> print words
  7. PythonRDD[2] at RDD at PythonRDD.scala:43

我们首先导入了add操作符,它是个命名函数,可以作为加法的闭包来使用。我们稍后再使用这个函数。首先我们要做的是把文本拆分为单词。我们创建了一个tokenize函数,参数是文本片段,返回根据空格拆分的单词列表。然后我们通过给flatMap操作符传递tokenize闭包对textRDD进行变换创建了一个wordsRDD。你会发现,words是个PythonRDD,但是执行本应该立即进行。显然,我们还没有把整个莎士比亚数据集拆分为单词列表。

如果你曾使用MapReduce做过Hadoop版的“字数统计”,你应该知道下一步是将每个单词映射到一个键值对,其中键是单词,值是1,然后使用reducer计算每个键的1总数。

首先,我们map一下。

  1. >>> wc = words.map(lambda x: (x,1))
  2. >>> print wc.toDebugString()
  3. (2) PythonRDD[3] at RDD at PythonRDD.scala:43
  4. | shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
  5. | shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2

我使用了一个匿名函数(用了Python中的lambda关键字)而不是命名函数。这行代码将会把lambda映射到每个单词。因此,每个x都是一个单词,每个单词都会被匿名闭包转换为元组(word, 1)。为了查看转换关系,我们使用toDebugString方法来查看PipelinedRDD是怎么被转换的。可以使用reduceByKey动作进行字数统计,然后把统计结果写到磁盘。

  1. >>> counts = wc.reduceByKey(add)
  2. >>> counts.saveAsTextFile("wc")

一旦我们最终调用了saveAsTextFile动作,这个分布式作业就开始执行了,在作业“跨集群地”(或者你本机的很多进程)运行时,你应该可以看到很多INFO语句。如果退出解释器,你可以看到当前工作目录下有个“wc”目录。

  1. $ ls wc/
  2. _SUCCESS part-00000 part-00001

每个part文件都代表你本机上的进程计算得到的被保持到磁盘上的最终RDD。如果对一个part文件进行head命令,你应该能看到字数统计元组。

  1. $ head wc/part-00000
  2. (u'fawn', 14)
  3. (u'Fame.', 1)
  4. (u'Fame,', 2)
  5. (u'kinghenryviii@7731', 1)
  6. (u'othello@36737', 1)
  7. (u'loveslabourslost@51678', 1)
  8. (u'1kinghenryiv@54228', 1)
  9. (u'troilusandcressida@83747', 1)
  10. (u'fleeces', 1)
  11. (u'midsummersnightsdream@71681', 1)

注意这些键没有像Hadoop一样被排序(因为Hadoop中Map和Reduce任务中有个必要的打乱和排序阶段)。但是,能保证每个单词在所有文件中只出现一次,因为你使用了reduceByKey操作符。你还可以使用sort操作符确保在写入到磁盘之前所有的键都被排过序。

编写一个Spark应用

编写Spark应用与通过交互式控制台使用Spark类似。API是相同的。首先,你需要访问<SparkContext,它已经由<pyspark自动加载好了。

使用Spark编写Spark应用的一个基本模板如下:

  1. ## Spark Application - execute with spark-submit
  2. ## Imports
  3. from pyspark import SparkConf, SparkContext
  4. ## Module Constants
  5. APP_NAME = "My Spark Application"
  6. ## Closure Functions
  7. ## Main functionality
  8. def main(sc):
  9. pass
  10. if __name__ == "__main__":
  11. # Configure Spark
  12. conf = SparkConf().setAppName(APP_NAME)
  13. conf = conf.setMaster("local[*]")
  14. sc = SparkContext(conf=conf)
  15. # Execute Main functionality
  16. main(sc)

这个模板列出了一个Spark应用所需的东西:导入Python库,模块常量,用于调试和Spark UI的可识别的应用名称,还有作为驱动程序运行的一些主要分析方法学。在ifmain中,我们创建了SparkContext,使用了配置好的context执行main。我们可以简单地导入驱动代码到pyspark而不用执行。注意这里Spark配置通过setMaster方法被硬编码到SparkConf,一般你应该允许这个值通过命令行来设置,所以你能看到这行做了占位符注释。

使用<sc.stop()或<sys.exit(0)来关闭或退出程序。


  1. ## Spark Application - execute with spark-submit
  2. ## Imports
  3. import csv
  4. import matplotlib.pyplot as plt
  5. from StringIO import StringIO
  6. from datetime import datetime
  7. from collections import namedtuple
  8. from operator import add, itemgetter
  9. from pyspark import SparkConf, SparkContext
  10. ## Module Constants
  11. APP_NAME = "Flight Delay Analysis"
  12. DATE_FMT = "%Y-%m-%d"
  13. TIME_FMT = "%H%M"
  14. fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
  15. 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
  16. Flight = namedtuple('Flight', fields)
  17. ## Closure Functions
  18. def parse(row):
  19. """
  20. Parses a row and returns a named tuple.
  21. """
  22. row[0] = datetime.strptime(row[0], DATE_FMT).date()
  23. row[5] = datetime.strptime(row[5], TIME_FMT).time()
  24. row[6] = float(row[6])
  25. row[7] = datetime.strptime(row[7], TIME_FMT).time()
  26. row[8] = float(row[8])
  27. row[9] = float(row[9])
  28. row[10] = float(row[10])
  29. return Flight(*row[:11])
  30. def split(line):
  31. """
  32. Operator function for splitting a line with csv module
  33. """
  34. reader = csv.reader(StringIO(line))
  35. return reader.next()
  36. def plot(delays):
  37. """
  38. Show a bar chart of the total delay per airline
  39. """
  40. airlines = [d[0] for d in delays]
  41. minutes = [d[1] for d in delays]
  42. index = list(xrange(len(airlines)))
  43. fig, axe = plt.subplots()
  44. bars = axe.barh(index, minutes)
  45. # Add the total minutes to the right
  46. for idx, air, min in zip(index, airlines, minutes):
  47. if min > 0:
  48. bars[idx].set_color('#d9230f')
  49. axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
  50. else:
  51. bars[idx].set_color('#469408')
  52. axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
  53. # Set the ticks
  54. ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
  55. xt = plt.xticks()[0]
  56. plt.xticks(xt, [' '] * len(xt))
  57. # minimize chart junk
  58. plt.grid(axis = 'x', color ='white', linestyle='-')
  59. plt.title('Total Minutes Delayed per Airline')
  60. plt.show()
  61. ## Main functionality
  62. def main(sc):
  63. # Load the airlines lookup dictionary
  64. airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
  65. # Broadcast the lookup dictionary to the cluster
  66. airline_lookup = sc.broadcast(airlines)
  67. # Read the CSV Data into an RDD
  68. flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
  69. # Map the total delay to the airline (joined using the broadcast value)
  70. delays = flights.map(lambda f: (airline_lookup.value[f.airline],
  71. add(f.dep_delay, f.arv_delay)))
  72. # Reduce the total delay for the month to the airline
  73. delays = delays.reduceByKey(add).collect()
  74. delays = sorted(delays, key=itemgetter(1))
  75. # Provide output from the driver
  76. for d in delays:
  77. print "%0.0f minutes delayed\t%s" % (d[1], d[0])
  78. # Show a bar chart of the delays
  79. plot(delays)
  80. if __name__ == "__main__":
  81. # Configure Spark
  82. conf = SparkConf().setMaster("local[*]")
  83. conf = conf.setAppName(APP_NAME)
  84. sc = SparkContext(conf=conf)
  85. # Execute Main functionality
  86. main(sc)

使用<spark-submit命令来运行这段代码(假设你已有ontime目录,目录中有两个CSV文件):

  1. ~$ spark-submit app.py

这个Spark作业使用本机作为master,并搜索app.py同目录下的ontime目录下的2个CSV文件。最终结果显示,4月的总延误时间(单位分钟),既有早点的(如果你从美国大陆飞往夏威夷或者阿拉斯加),但对大部分大型航空公司都是延误的。注意,我们在app.py中使用matplotlib直接将结果可视化出来了:

这段代码做了什么呢?我们特别注意下与Spark最直接相关的main函数。首先,我们加载CSV文件到RDD,然后把split函数映射给它。split函数使用csv模块解析文本的每一行,并返回代表每行的元组。最后,我们将collect动作传给RDD,这个动作把数据以Python列表的形式从RDD传回驱动程序。本例中,airlines.csv是个小型的跳转表(jump table),可以将航空公司代码与全名对应起来。我们将转移表存储为Python字典,然后使用sc.broadcast广播给集群上的每个节点。

接着,main函数加载了数据量更大的flights.csv([译者注]作者笔误写成fights.csv,此处更正)。拆分CSV行完成之后,我们将parse函数映射给CSV行,此函数会把日期和时间转成Python的日期和时间,并对浮点数进行合适的类型转换。每行作为一个NamedTuple保存,名为Flight,以便高效简便地使用。

有了Flight对象的RDD,我们映射一个匿名函数,这个函数将RDD转换为一些列的键值对,其中键是航空公司的名字,值是到达和出发的延误时间总和。使用reduceByKey动作和add操作符可以得到每个航空公司的延误时间总和,然后RDD被传递给驱动程序(数据中航空公司的数目相对较少)。最终延误时间按照升序排列,输出打印到了控制台,并且使用matplotlib进行了可视化。

这个例子稍长,但是希望能演示出集群和驱动程序之间的相互作用(发送数据进行分析,结果取回给驱动程序),以及Python代码在Spark应用中的角色。

结论

尽管算不上一个完整的Spark入门,我们希望你能更好地了解Spark是什么,如何使用进行快速、内存分布式计算。至少,你应该能将Spark运行起来,并开始在本机或Amazon EC2上探索数据。你应该可以配置好iPython notebook来运行Spark。

Spark不能解决分布式存储问题(通常Spark从HDFS中获取数据),但是它为分布式计算提供了丰富的函数式编程API。这个框架建立在伸缩分布式数据集(RDD)之上。RDD是种编程抽象,代表被分区的对象集合,允许进行分布式操作。RDD有容错能力(可伸缩的部分),更重要的时,可以存储到节点上的worker内存里进行立即重用。内存存储提供了快速和简单表示的迭代算法,以及实时交互分析。

由于Spark库提供了Python、Scale、Java编写的API,以及内建的机器学习、流数据、图算法、类SQL查询等模块;Spark迅速成为当今最重要的分布式计算框架之一。与YARN结合,Spark提供了增量,而不是替代已存在的Hadoop集群,它将成为未来大数据重要的一部分,为数据科学探索铺设了一条康庄大道。

转载本站内容时,请务必注明来自W3xue,违者必究。
 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号