Spark笔记之第一个Spark程序
in 大数据 with 0 comment

Spark笔记之第一个Spark程序

in 大数据 with 0 comment

第一个Spark程序

from pyspark import SparkContext
# 第一步当然是加入所需的Spark模块
sc = SparkContext(master='local', appName='pyspark')
# 第二步是实例化SparkContext对象
def isprime(n):
    n = abs(int(n))
    if n < 2:
        return False
    if n == 2:
        return True
    if not n & 1:
        return False
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            return False
    return True
nums = sc.parallelize(list(range(1000000)))
nums.filter(isprime).count()
print(nums)

假如能够得到一个数字并且没有任何报错,那么基本上就是妥了。

Hello World

原文地址

不能免俗的是应该使用Spark编写一个“Hello World”。假如当前路径是~/spark_demo,并且存在内容为"hello world"的名为hello_world.txt的文件。这一部分的内容是官网的Quick Start部分,因为强迫症的原因不喜欢Java风格的变量名,因而与原文会有差异。

Basics

Spark’s primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Due to Python’s dynamic nature, we don’t need the Dataset to be strongly-typed in Python. As a result, all Datasets in Python are Dataset[Row], and we call it DataFrame to be consistent with the data frame concept in Pandas and R.
Spark 的主要抽象概念是被称为 Dataset(数据集) 的分布式数据集合。一个数据集 可以从 Hadoop InputFormats (比如 HDFS)或从其他数据集转换而来。因为 Python 的动态特性,所以我们没有必要在 Python 中使用强类型的数据集。因此,Python中的所有数据集均是Dataset [Row],我们将其称为DataFrame,以与Pandas和R中的数据框概念保持一致。

>>> text_file = spark.read.text('hello_world.txt')

You can get values from DataFrame directly, by calling some actions, or transform the DataFrame to get a new one. For more details, please read the API doc.
可以通过调用某些操作直接从DataFrame获取值,也可以转换DataFrame以获取新值。更多内容请戳我

>>> text_file
DataFrame[value: string]
>>> text_file.count()
1
>>> text_file.first()
Row(value='hello world')

Now let’s transform this DataFrame to a new one. We call filter to return a new DataFrame with a subset of the lines in the file.
现在让我们将这个DataFrame转换为一个新的DataFrame。我们调用filter来返回一个新的DataFrame,其中包含文件中的一行子集。

>>>lines_with_spark = text_file.filter(text_file.value.contains('hello'))

We can chain together transformations and actions:
我们可以将转换和目标联系在一起:

>>>text_file.filter(text_file.value.contains('hello')).count() # 包含 hello 的行数
1

More on Dataset Operations

>>> text_file = spark.read.text('hello_world.txt')
>>> from pyspark.sql.functions import *
>>> text_file.select(size(split(text_file.value, "\s+")).name("num_words")).agg(max(col("num_words"))).collect()
[Row(max(num_words)=2)]

This first maps a line to an integer value and aliases it as “numWords”, creating a new DataFrame. agg is called on that DataFrame to find the largest word count. The arguments to select and agg are both Column, we can use df.colName to get a column from a DataFrame. We can also import pyspark.sql.functions, which provides a lot of convenient functions to build a new Column from an old one.
这首先将一行映射为整数值,并将其别名为“numWords”,从而创建一个新的DataFrame。在该DataFrame上调用agg以查找最大的字数。select和agg的参数都是Column,我们可以使用df.colName从DataFrame中获取一列。我们还可以导入pyspark.sql.functions,它提供了许多方便的功能来从旧的 Column 构建一个新的 Column 。

One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
一种比较普遍的数据流模式是MapReduce,由Hadoop推广。Spark可以轻松实现MapReduce流程:

>>> word_counts = text_file.select(explode(split(text_file.value, "\s+")).alias("word")).groupBy("word").count()
>>> word_counts
DataFrame[word: string, count: bigint]

Here, we use the explode function in select, to transform a Dataset of lines to a Dataset of words, and then combine groupBy and count to compute the per-word counts in the file as a DataFrame of 2 columns: “word” and “count”. To collect the word counts in our shell, we can call collect:
在这里,我们使用select中的explode函数,将行数据集转换为单词数据集,然后将groupBy和count结合起来计算文件中的单词计数,DataFrame包含两列:“word”和“count”。要统计我们的shell中的单词计数,我们可以调用collect:

>>> word_counts.collect()
[Row(word='hello', count=1), Row(word='world', count=1)]

Caching

Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank. As a simple example, let’s mark our linesWithSpark dataset to be cached:
Spark还支持将数据集存储在群集的内存中缓存。这在重复访问数据时非常有用,例如查询小的“经常被访问的”数据集或运行像PageRank这样的迭代算法时。举个简单的例子,让我们标记要缓存的linesWithSpark数据集:

>>> lines_with_spark.cache()
DataFrame[value: string]
>>> lines_with_spark.count()
1
>>> lines_with_spark.count()
1
>>>

It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting bin/pyspark to a cluster, as described in the RDD programming guide.
使用Spark来研究和缓存100行文本文件似乎很愚蠢。有趣的是,这些相同的函数可以用于大规模的数据集,即使它们跨越数十个或数百个的交错的节点。您也可以通过将bin/pyspark连接到群集来交互式地执行此操作,如RDD编程指南中所述

Self-Contained Applications

Suppose we wish to write a self-contained application using the Spark API. We will walk through a simple application in Scala (with sbt), Java (with Maven), and Python (pip).
加入我们希望使用Spark API编写一个独立的应用程序。我们将在通过Scala(使用sbt),Java(使用Maven)和Python(pip)。

Now we will show how to write an application using the Python API (PySpark).
If you are building a packaged PySpark application or library you can add it to your setup.py file as:

现在我们将展示如何使用Python API(PySpark)编写应用程序。
如果要构建打包的PySpark应用程序或库,可以将其添加到setup.py文件中,例如:

install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

As an example, we’ll create a simple Spark application, SimpleApp.py:
作为示例,我们将创建一个简单的Spark应用程序SimpleApp.py:

"""SimpleApp.py"""
>>> from pyspark.sql import SparkSession
>>> log_file = 'hello_world.txt'
>>> spark = SparkSession.builder.appName('SimpleApp').getOrCreate()
>>> spark
<pyspark.sql.session.SparkSession object at 0x7f5fc50c73c8>
>>> log_data = spark.read.text(log_file).cache()
>>> log_data
DataFrame[value: string]
>>> num_os = log_data.filter(log_data.value.contains('o')).count()
>>> num_os
1
>>> num_ls = log_data.filter(log_data.value.contains('l')).count()
>>> num_ls
1
>>> print('Lines with o: {}, lines with l: {}'.format(num_os, num_ls))
Lines with o: 1, lines with l: 1

>>> spark.stop()

This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in a text file. Note that you’ll need to replace YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala and Java examples, we use a SparkSession to create Datasets. For applications that use custom classes or third-party libraries, we can also add code dependencies to spark-submit through its --py-files argument by packaging them into a .zip file (see spark-submit --help for details). SimpleApp is simple enough that we do not need to specify any code dependencies.
该程序只计算包含'o'的行数和包含文本文件中'l'的数字。请注意,您需要将YOUR_SPARK_HOME替换为安装Spark的位置。与Scala和Java示例一样,我们使用SparkSession来创建数据集。对于使用自定义类或第三方库的应用程序,我们还可以通过将它们打包到.zip文件中来添加代码依赖项以通过其--py-files参数进行spark-submit(有关详细信息,请参阅spark-submit --help)。 SimpleApp非常简单,我们不需要指定任何代码依赖项。

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

If you have PySpark pip installed into your environment (e.g., pip install pyspark), you can run your application with the regular Python interpreter or use the provided ‘spark-submit’ as you prefer.
如果您的环境中已经安装了PySpark pip(pip install pyspark),您可以使用常规Python解释器运行您的应用程序,或者根据您的喜好使用提供的“spark-submit”。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

Where to Go from Here

Congratulations on running your first Spark application!

For an in-depth overview of the API, start with the RDD programming guide and the SQL programming guide, or see “Programming Guides” menu for other components.
For running applications on a cluster, head to the deployment overview.
Finally, Spark includes several samples in the examples directory (Scala, Java, Python, R). You can run them as follows:

恭喜你正在运行你的第一个Spark程序!

有关API的深入概述,请从RDD编程指南SQL编程指南开始,或参阅其他组件的编程指南。
要在群集上运行应用程序,请转至部署概述
最后,Spark在examples目录(Scala,Java,Python,R)中包含了几个示例。您可以按如下方式运行它们:

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

--EOF--

Responses