Spark笔记之所谓的RDD编程指北
in 大数据 with 0 comment

Spark笔记之所谓的RDD编程指北

in 大数据 with 0 comment

书接前文

在上一篇里面翻译了 Spark 官方的 Quick Start 部分的文章,在最后面官方给出了进一步学习的指引:

有关API的深入概述,请从RDD编程指南SQL编程指南开始,或参阅其他组件的编程指南。

Map

发现右边的章节索引有了问题,所以在这里重新增加一个:

RDD Programming Guide

Overview

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.
在更高的层面,每个Spark应用程序都包含一个驱动程序,这个程序运行用户的主函数并在集群上执行各种并行操作。Spark提供的主要抽象概念是弹性分布式数据集(RDD),它是跨群集节点分区的元素集合,可以并行操作。Spark提供的主要抽象概念是弹性分布式数据集(RDD),它是跨群集节点分区的元素集合,可以并行操作。RDD是通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或现有Scala驱动程序集合开始并对其进行创建和转换的。用户还可以要求Spark在内存中保留RDD,允许它在并行操作中有效地重用。 最后,RDD会自动从节点故障中恢复。

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
Spark中的第二个抽象概念是可以在并行操作中使用的共享变量。默认情况下,当Spark并行运行一个函数作为不同节点上的一组任务时,它会将函数中使用的每个变量的副本发送给每个任务。有时,变量需要跨任务共享,或者在任务和驱动程序之间共享。Spark支持两种类型的共享变量:广播变量和累加器,广播变量可用于缓存所有节点的内存中的值;累加器是仅“添加”到的变量,例如计数器和总和。

This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.
本指南以Spark支持的每种语言显示了这些功能。 如果你启动Spark的交互式shell,最简单的方法是使用 - 用于Scala shell的bin/spark-shell或用于Python的bin/ pyspark。

Linking with Spark

Spark 2.4.0 works with Python 2.7+ or Python 3.4+. It can use the standard CPython interpreter, so C libraries like NumPy can be used. It also works with PyPy 2.3+.
Spark 2.4.0 可以与 Python2.7以上的版本或Python3.4以上的版本一起工作。它可以使用标准的CPython解释器,因此可以使用像NumPy这样的C库。它也适用于PyPy 2.3+。

Python 2.6 support was removed in Spark 2.2.0.
Python 2.6 的支持在 Spark 2.2.0 中被移除。

Spark applications in Python can either be run with the bin/spark-submit script which includes Spark at runtime, or by including it in your setup.py as:
Python中的Spark应用程序可以使用bin/spark-submit脚本运行,该脚本在运行时包含Spark,也可以将其包含在setup.py中:

install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

To run Spark applications in Python without pip installing PySpark, use the bin/spark-submit script located in the Spark directory. This script will load Spark’s Java/Scala libraries and allow you to submit applications to a cluster. You can also use bin/pyspark to launch an interactive Python shell.
如果在不使用pip安装PySpark的情况下,在Python中运行Spark应用程序,请使用位于Spark目录中的bin/spark-submit脚本。此脚本将加载Spark的Java/Scala库,并允许您将应用程序提交到群集。您还可以使用bin/pyspark来启动交互式Python shell。

If you wish to access HDFS data, you need to use a build of PySpark linking to your version of HDFS. Prebuilt packages are also available on the Spark homepage for common HDFS versions.
如果希望访问HDFS数据,则需要使用链接到您的HDFS版本的PySpark构建。Spark主页上还提供了预构建的软件包,可用于常见的HDFS版本。

Finally, you need to import some Spark classes into your program. Add the following line:
最后,需要将一些Spark类导入到程序中。 添加以下代码:

from pyspark import SparkContext, SparkConf

PySpark requires the same minor version of Python in both driver and workers. It uses the default python version in PATH, you can specify which version of Python you want to use by PYSPARK_PYTHON, for example:
PySpark在驱动程序和工作程序中都需要相同的自版本版本的Python。它使用环境变量PATH中的默认python版本,可以指定PYSPARK_PYTHON要使用的Python版本,例如:

$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

Initializing Spark

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.
Spark程序必须做的第一件事是实例化一个SparkContext对象,它告诉Spark如何访问集群。要实例化SparkContext对象,首先需要实例化一个包含有关应用程序信息的SparkConf对象。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.
appName参数是应用程序在集群UI上显示的名称。master是Spark,Mesos或YARN群集URL,或者是以本地模式运行时的“local”字符串。实际上,当在群集上运行时,您不希望在程序中对master进行硬编码,而是使用spark-submit启动应用程序并在那里接收它。但是,对于本地测试和单元测试,可以传递“local”来运行Spark in-process。

Using the Shell

In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the --master argument, and you can add Python .zip, .egg or .py files to the runtime path by passing a comma-separated list to --py-files. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates to the --packages argument. Any additional repositories where dependencies might exist (e.g. Sonatype) can be passed to the --repositories argument. Any Python dependencies a Spark package has (listed in the requirements.txt of that package) must be manually installed using pip when necessary. For example, to run bin/pyspark on exactly four cores, use:
在PySpark shell中,已经在名为sc的变量中为您创建了一个特殊的解释器感知SparkContext。 实例化自己的SparkContext将无法正常工作。 您可以使用--master参数设置上下文连接到哪个主服务器,并且可以通过将逗号分隔的列表传递给--py-files将Python .zip,.egg或.py文件添加到运行时路径。您还可以通过向--packages参数提供以逗号分隔的Maven坐标列表,将依赖项(例如Spark包)添加到shell会话中。可能存在依赖关系的任何其他存储库(例如Sonatype)可以传递给--repositories参数。 必要时,必须使用pip手动安装Spark软件包具有的任何Python依赖项(在该软件包的requirements.txt中列出)。例如,要在四个核心上运行bin/pyspark,请使用:

$ ./bin/pyspark --master local[4]

Or, to also add code.py to the search path (in order to later be able to import code), use:
或者,要将code.py添加到搜索路径(以便以后能够导入代码),请使用:

$ ./bin/pyspark --master local[4] --py-files code.py

For a complete list of options, run pyspark --help. Behind the scenes, pyspark invokes the more general spark-submit script.
有关选项的完整列表,请运行pyspark --help。在后台,pyspark调用所有的spark-submit脚本

It is also possible to launch the PySpark shell in IPython, the enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To use IPython, set the PYSPARK_DRIVER_PYTHON variable to ipython when running bin/pyspark:
也可以在IPython中启动PySpark shell。PySpark适用于IPython 1.0.0及更高版本。要使用IPython,请在运行bin/pyspark时将PYSPARK_DRIVER_PYTHON变量设置为ipython:

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

To use the Jupyter notebook (previously known as the IPython notebook),
使用Jupyter notebook(以前称为IPython notebook)

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

You can customize the ipython or jupyter commands by setting PYSPARK_DRIVER_PYTHON_OPTS.
您可以通过设置PYSPARK_DRIVER_PYTHON_OPTS来自定义ipython或jupyter命令。

After the Jupyter Notebook server is launched, you can create a new “Python 2” notebook from the “Files” tab. Inside the notebook, you can input the command %pylab inline as part of your notebook before you start to try Spark from the Jupyter notebook.
启动Jupyter Notebook服务器后,可以从“文件”选项卡创建一个新的“Python 2”(当然我用的是Python3)notebook。 在notebook内部,可以在开始尝试使用Jupyter笔记本中的Spark之前输入命令%pylab inline作为笔记本的一部分。

Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
Spark围绕弹性分布式数据集(RDD)的概念展开,它是可以并行操作的可容错的集合。创建RDD有两种方法:并行化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统,HDFS,HBase或提供Hadoop InputFormat的任何数据源。

Parallelized Collections

Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
通过在驱动程序中的现有可迭代或集合上调用SparkContext的parallelize方法来创建并行集合。复制集合的元素以形成可以并行操作的分布式数据集。例如,以下是创建包含数字1到5的并行化集合:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list. We describe operations on distributed datasets later on.
一旦创建,分布式数据集(distData)可以并行操作。 例如,我们可以调用distData.reduce(lambda a,b:a + b)来添加列表的元素。我们稍后将描述对分布式数据集的操作。

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.
并行集合的一个重要参数是将数据集切割为的分区数。 Spark将为集群的每个分区运行一个任务。通常,您希望群集中的每个CPU有2-4个分区。通常,Spark会尝试根据您的群集自动设置分区数。 但是,您也可以通过将其作为第二个参数传递给并行化来手动设置它(例如sc.parallelize(data,10))。注意:代码中的某些位置使用切片(分区的同义词)来保持向后兼容性。

External Datasets

PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和其他任何Hadoop InputFormat。

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. Here is an example invocation:
可以使用SparkContext的textFile方法创建文本文件类型RDD。此方法获取文件的URI(计算机上的本地路径,或hdfs://,s3a://等URI)并将其作为行集合读取。这是一个示例调用:

>>> distFile = sc.textFile("data.txt")

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b).
一旦被创建,distFile可以由数据集操作执行。例如,我们可以使用map和reduce添加所有行的大小:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)。

Some notes on reading files with Spark:
一些关于Spark读取文件的注意事项:

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
如果在本地文件系统上使用路径,必须可以在工作节点上的相同路径上访问该文件。将文件复制到所有工作节点或使用网络挂载的共享文件系统。
All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
Spark的所有基于文件的输入方法(包括textFile)都支持在目录,压缩文件和通配符上运行。例如:textFile("/my/directory"), textFile("/my/directory/.txt"), 和 textFile("/my/directory/.gz").
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中默认为128MB),但也可以通过传递更大的值来请求更多的分区。 请注意,分区不能比块少。

Apart from text files, Spark’s Python API also supports several other data formats:
除文本文件外,Spark的Python API还支持其他几种数据格式:

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
SparkContext.wholeTextFiles允许读取多个小文本文件的目录,并将每个文件作为(文件名,内容)对返回。这与textFile形成对比,textFile将在每个文件中每行返回一条记录。
RDD.saveAsPickleFile and SparkContext.pickleFile support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
RDD.saveAsPickleFile和SparkContext.pickleFile支持以包含pickle Python对象的简单格式保存RDD。 批处理用于pickle序列化,默认批处理大小为10。
SequenceFile and Hadoop Input/Output Formats
SequenceFile和Hadoop输入/输出格式

Note this feature is currently marked Experimental and is intended for advanced users. It may be replaced in future with read/write support based on Spark SQL, in which case Spark SQL is the preferred approach.
请注意,此功能目前标记为“实验”,适用于高级用户。将来可能会使用基于Spark SQL的读/写支持替换它,在这种情况下,Spark SQL是首选方法。

Writable Support

PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the resulting Java objects using Pyrolite. When saving an RDD of key-value pairs to SequenceFile, PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following Writables are automatically converted:
PySpark SequenceFile支持在Java中加载键值对的RDD,将Writable转换为基本Java类型,并使用Pyrolite序列化生成的Java对象。将键值对的RDD保存到SequenceFile时,PySpark会反序列化过来。它将Python对象解开为Java对象,然后将它们转换为Writable。 以下Writable会自动转换:

Writable TypePython Type
Textunicode str
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
MapWritabledict

bytearray是什么鬼!!!这是在Python的C API中定义的数据结构,在这里

Arrays are not handled out-of-the-box. Users need to specify custom ArrayWritable subtypes when reading or writing. When writing, users also need to specify custom converters that convert arrays to custom ArrayWritable subtypes. When reading, the default converter will convert custom ArrayWritable subtypes to Java Object[], which then get pickled to Python tuples. To get Python array.array for arrays of primitive types, users need to specify custom converters.
数组不是开箱即用的。用户在读取或写入时需要指定自定义ArrayWritable子类型。在编写时,用户还需要指定将数组转换为自定义ArrayWritable子类型的自定义转换器。在读取时,默认转换器会将自定义ArrayWritable子类型转换为Java Object [],然后将其序列化为Python元组。要为原始类型的数组获取Python array.array,用户需要指定自定义转换器。

Saving and Loading SequenceFiles

Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value classes can be specified, but for standard Writables this is not required.
与文本文件类似,可以通过指定路径来保存和加载SequenceFiles。可以指定键和值类,但对于标准Writable,这不是必需的。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

Saving and Loading Other Hadoop Input/Output Formats

PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both ‘new’ and ‘old’ Hadoop MapReduce APIs. If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the Elasticsearch ESInputFormat:
PySpark还可以为新版本和旧版本Hadoop MapReduce API读取任何Hadoop InputFormat或编写任何Hadoop OutputFormat。如果需要,Hadoop配置可以作为Python dict传入。以下是使用Elasticsearch ESInputFormat的示例:

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
                             "org.apache.hadoop.io.NullWritable",
                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                             conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

Note that, if the InputFormat simply depends on a Hadoop configuration and/or input path, and the key and value classes can easily be converted according to the above table, then this approach should work well for such cases.
请注意,如果InputFormat仅依赖于Hadoop配置和(或)输入路径,并且可以根据上表轻松转换键和值类,则此方法应适用于此类情况。

If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to transform that data on the Scala/Java side to something which can be handled by Pyrolite’s pickler. A Converter trait is provided for this. Simply extend this trait and implement your transformation code in the convert method. Remember to ensure that this class, along with any dependencies required to access your InputFormat, are packaged into your Spark job jar and included on the PySpark classpath.
如果有自定义序列化二进制数据(例如从Cassandra/HBase加载数据),那么首先需要将Scala/Java端的数据转换为可由Pyrolite的pickler处理的数据。为此提供了转换器特性。只需扩展此特征并在Converter方法中实现转换代码。请确保将此类以及访问InputFormat所需的任何依赖项打包到Spark工作的jar包中并包含在PySpark类路径中。

See the Python examples and the Converter examples for examples of using Cassandra / HBase InputFormat and OutputFormat with custom converters.
有关使用Cassandra/HBase InputFormat和OutputFormat以及自定义转换器的示例,请参阅Python示例Converter示例

RDD Operations

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
RDD支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在数据集上运行计算后将值返回到驱动程序)。例如,map是一个转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新RDD。另一方面,reduce是一个使用某个函数聚合RDD的所有元素的操作,并将最终结果返回给驱动程序(尽管还有一个返回分布式数据集的并行reduceByKey)。

什么应用场景下 group by 比 reduce by 更好?

在对大数据进行复杂计算时,reduceByKey 优于 groupByKey。

另外,如果仅仅是 group 处理,那么以下函数应该优先于 groupByKey:
  ( 1 )、combineByKey 组合数据,但是组合之后的数据类型与输入时值的类型不一样。
  ( 2 )、foldByKey 合并每一个 key 的所有值,在级联函数和“零值”中使用。

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

Spark中的所有转换都是惰性的(延迟计算),所以它们不会立即计算结果。但只应用于某些基础数据集(例如文件)的转换。仅当action需要将结果返回到驱动程序时才会计算。这种设计使Spark能够更有效地运行。例如,我们可以通过map创建的数据集将用于reduce,并仅将reduce的结果返回给驱动程序,而不是更大的映射数据集。

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
默认情况下,每次对其执行操作时,都可以重新计算每个转换后的RDD。但是,可以使用持久化(或缓存)方法在内存中保留RDD,在这种情况下,Spark会在群集上保留元素,以便在下次查询时更快地访问。也支持在磁盘上保留RDD,或在多个节点之间复制。

Basics

To illustrate RDD basics, consider the simple program below:
为了介绍RDD基础知识,请参考以下程序:

lines = sc.textFile("data.txt")
line_lengths = lines.map(lambda s: len(s))
total_length = line_lengths.reduce(lambda a, b: a + b)

The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.
第一行从外部文件定义基础RDD。此数据集未加载到内存中或以其他方式操作:行仅仅是指向文件的指针。第二行将line_lengths定义为map转换的结果。同样,由于延迟计算,line_lengths不会立即计算。最后,我们运行reduce,这是一个action操作。此时,Spark将计算任务分派在不同机器上运行,并且每台机器都运行其部分映射和本地缩减,仅返回其对驱动程序的结果。

If we also wanted to use lineLengths again later, we could add:
如果以后想再次使用line_lengths,可以添加:

line_lengths.persist()

before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.
reduce之前,这会让line_lengths在第一次计算之后保存在内存中。

Passing Functions to Spark

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are three recommended ways to do this:
Spark的API在很大程度上依赖于在驱动程序中传递函数以在集群上运行。有三种建议的方法可以做到这一点:

Lambda expressions, for simple functions that can be written as an expression. (Lambdas do not support multi-statement functions or statements that do not return a value.)
Lambda表达式,用于可以作为表达式编写的匿名函数。 (Lambdas不支持多语句或不返回值的函数。)
Local defs inside the function calling into Spark, for longer code.
def 定义函数调用Spark,代码可以更长。
Top-level functions in a module.
模块中的函数。

For example, to pass a longer function than can be supported using a lambda, consider the code below:
例如,要使用比lambda更长的函数可以参考以下:

if __name__ == "__main__":
    def my_func(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(my_func)

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:
请注意,虽然也可以将引用传递给类实例中的方法(而不是单例对象),但这需要传递包含该类的对象以及方法。例如,可以这样考虑:

class MyClass(object):
    def func(self, s):
        return s
    def do_stuff(self, rdd):
        return rdd.map(self.func)

Here, if we create a new MyClass and call doStuff on it, the map inside there references the func method of that MyClass instance, so the whole object needs to be sent to the cluster.

在这里,如果我们创建一个新的MyClass并在其上调用do_stuff,那里的map会引用该MyClass实例的func方法,因此需要将整个对象传递到集群。

In a similar way, accessing fields of the outer object will reference the whole object:
相似的,访问外部对象的字段将引用整个对象:

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def do_stuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:
为了避免这种问题,最简单的方法是将字段复制到本地变量而不是从外部访问它:

def do_stuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

其实就是拷贝了成员变量)

Understanding closures

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.
Spark的一个难点是在跨集群执行代码时理解变量和方法的作用范围和生命周期。修改其范围之外的变量的RDD操作可能经常引起混乱。在下面的示例中,将使用foreach() 递增计数器的代码,当然其他操作也可能出现类似问题。

Example

Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (--master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):
考虑下面的RDD元素总和,是否执行在同一JVM中发生,它可能表现不同。一个常见的例子是在本地模式下运行Spark(--master = local [n])而不是将Spark应用程序部署到集群(例如通过spark-submit to YARN):

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

当然了我觉得用global是一件很蠢的事情

Local vs. cluster modes

The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.
上述代码的行为没有定义,可能无法按照预期工作。为了执行工作,Spark将RDD操作的处理分解为任务,每个任务由执行程序执行。在执行之前,Spark计算任务的闭包。闭包是那些变量和方法,它们必须是可见的,以便执行程序在RDD上执行其计算(在本例中为foreach())。该闭包被序列化并发送给每个执行者。

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.
发送给每个执行程序的闭包内的变量现在是副本(可以理解为传参),因此,当在foreach函数中引用计数器时,它不再是驱动程序节点上的计数器。驱动程序节点的内存中仍然有一个计数器,但执行程序不可见!执行程序只能看到序列化闭包中的副本。因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用了序列化闭包内的值。

In local mode, in some circumstances, the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.
在本地模式下,在某些情况,foreach函数实际上将在与驱动程序相同的JVM中执行,并将引用相同的原始计数器,并且可能实际更新它。

To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
为了确保在这些场景中定义良好的行为(emmm就是说为了避免这些问题产生),应该使用累加器。 Spark中的累加器专门用于提供一种机制,用于在跨集群中的工作节点拆分执行时安全地更新变量。本指南的“Accumulators”部分更详细地讨论了这些内容。

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
通常,闭包 - 类似循环或本地定义的方法的构造不应该用于改变某些全局状态。 Spark没有定义或保证从闭包外部引用的对象的转变行为。执行此操作的某些代码可能在本地模式下工作,但这只是偶然的,并且此类代码在分布式模式下不会按预期运行。如果需要某些全局聚合,请使用Accumulators(累加器)。

Printing elements of an RDD

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).
另一个常见的习惯用法是尝试使用rdd.foreach(println)rdd.map(println)打印出RDD的元素。在一台机器上,这将生成预期的输出并打印所有RDD的元素。但是,在集群模式下,执行程序调用的stdout输出现在写入执行程序的stdout,而不是驱动程序上的那个,因此驱动程序上的stdout不会显示这些!要打印驱动程序上的所有元素,可以使用collect()方法首先将RDD带到驱动程序节点:rdd.collect().foreach(println)。但是,这会导致驱动程序内存不足,因为collect()会将整个RDD提取到一台机器上;如果你只需要打印RDD的一些元素,更安全的方法是使用take()rdd.take(100).foreach(println)

Working with Key-Value Pairs

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements by a key.
虽然大多数Spark操作都适用于包含任何类型对象的RDD,但一些特殊操作仅适用于键值对的RDD。最常见的是分布式“shuffle”操作,例如通过密钥对元素进行分组或聚合。

In Python, these operations work on RDDs containing built-in Python tuples such as (1, 2). Simply create such tuples and then call your desired operation.
在Python中,这些操作适用于包含内置Python元组的RDD,如(1, 2)。只需创建这样的元组然后调用想要的操作。

For example, the following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:
例如,以下代码使用reduceByKey来计算文件中每行文本出现的次数:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as a list of objects.
例如,我们也可以使用counts.sortByKey()来按字母顺序对这些键值对进行排序,最后使用counts.collect()将它们作为对象列表返回到驱动程序。

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.
下表列出了Spark支持的一些常见转换。有关详细信息,请参阅RDD API文档(ScalaJavaPythonR)和RDD函数文档(ScalaJava)。

原表格移步这里,太长了就不搬运原表了

Transformation转换Meaning意义
map(func)返回通过函数func传递源的每个元素形成的新分布式数据集。
filter(func)返回通过函数func返回true的元素形成的新数据集。
flatMap(func)与map类似,但每个输入项可以映射到0个或更多输出项(因此func应该返回Seq而不是单个项)。
mapPartitions(func)与map类似,但单独运行在RDD的每个分区(块)上,因此当在类型T的RDD上运行时,func必须是Iterator <T> => Iterator <U>类型。
mapPartitionsWithIndex(func)与mapPartitions类似,但也为func提供了表示分区索引的整数值,因此当在类型T的RDD上运行时,func必须是类型(Int,Iterator <T>)=> Iterator <U>。
sample(withReplacement, fraction, seed)使用给定的随机数种子生成器,在有或没有替换的情况下对数据的一小部分进行采样。
union(otherDataset)返回一个新数据集,源数据集和参数中元素的并集。
intersection(otherDataset)返回一个新数据集,源数据集和参数中元素的交集。
distinct([numPartitions]))返回包含源数据集的不同元素的新数据集。去重?
groupByKey([numPartitions])(K, V)对的数据集上调用时,返回(K, Iterable <V>)对的数据集。
注意:如果要对每个键执行聚合(例如总和或平均值)进行分组,则使用reduceByKeyaggregateByKey将会有更好的性能。
注意:默认情况下,输出中的并行级别取决于父RDD的分区数。可以传递可选的numPartitions参数来设置不同数量的任务。
reduceByKey(func, [numPartitions])当调用(K, V)对的数据集时,返回(K, V)对的数据集,其中使用给定的reduce() func聚合每个键的值,该函数必须是类型(V, V)=> V。与groupByKey类似,reduce任务的数量可通过可选的第二个参数进行配置。
sortByKey([ascending], [numPartitions])当调用K实现排序的(K, V)对数据集时,返回按键升序或降序排序的(K, V)对数据集,如布尔升序参数中所指定。
join(otherDataset, [numPartitions])当调用类型(K, V)(K, W)的数据集时,返回(K, (V, W))对的数据集以及每个键的所有元素对。通过leftOuterJoinrightOuterJoinfullOuterJoin支持外连接。
cogroup(otherDataset, [numPartitions])当调用类型(K, V)(K, W)的数据集时,返回(K, (Iterable <V>, Iterable <W>))元组的数据集。此操作也称为groupWith
cartesian(otherDataset)当调用类型T和U的数据集时,返回(T, U)对的数据集(所有元素对)。
pipe(command, [envVars])通过shell命令管道操作RDD的每个分区,例如,一个Perl或bash脚本。 RDD元素被写入进程的stdin,并且输出到其stdout的行将作为字符串的RDD返回。
coalesce(numPartitions)将RDD中的分区数减少为numPartitions。过滤大型数据集后,可以更有效地运行操作。
repartition(numPartitions)随机重新调整RDD中的数据,以创建更多或更少的分区并在它们之间进行平衡。这总是随机shuffles网络上的所有数据。
repartitionAndSortWithinPartitions(partitioner)根据给定的分区重新分区RDD,并在每个生成的分区中按键对记录进行排序。这比调用重新分区然后在每个分区内排序更有效,因为它可以将排序推送到shuffle机器中。

Action

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.
下表列出了Spark支持的一些常见action。有关详细信息,请参阅RDD API文档(ScalaJavaPythonR)并搭配阅读RDD函数文档(ScalaJava)。

Apache 有话好好说能不能别写表格了…,原表格在这里

ActionMeaning
reduce(func)使用函数func(接受两个参数并返回一个)来聚合数据集的元素。该函数应该是可交换的和关联的,以便可以并行正确计算。
collect()在驱动程序中将数据集的所有元素作为数组返回。在过滤器或返回其他小的数据子集的时,这通常很有用。
count()返回数据集中的元素数。
first()返回数据集的第一个元素(类似于take(1))。
take(n)返回数据集的前n个元素的数组。
takeSample(withReplacement, num, [seed])返回一个数组,其中包含数据集的num个元素的随机样本,with or without replacement(emmm这句不懂),预先指定随机数生成器种子(可选)。
takeOrdered(n, [ordering])按照顺序顺序或自定义比较器返回RDD的前n个元素。
saveAsTextFile(path)将数据集的元素作为文本文件(或文本文件集)写入本地文件系统,HDFS或其他任何Hadoop支持的文件系统的给定目录中。Spark将在每个元素上调用toString,将其转换为文件中的一行文本。
saveAsSequenceFile(path)(Java and Scala)
saveAsObjectFile(path)(Java and Scala)
countByKey()仅适用于(K, V)类型的RDD。返回(K, Int)对的散列映射和每个键的计数。
foreach(func)在数据集的每个元素上运行函数func。这通常用于副作用,例如更新累加器或与外部存储系统交互。
注意:在foreach()之外修改除累加器之外的变量可能会导致未定义的行为。有关详细信息,请参阅 Understanding closures 章节。
Responses