← Back to list

[译]Spark RDD Programming Guide(2.4.3)

Published on: | Views: 73

原文地址: https://spark.apache.org/docs/latest/rdd-programming-guide.html

Overview

从高层次的角度来看, spark应用由一个驱动程序(运行用户的main函数)和在集群上执行各种并发的操作组成.spark的主要抽象是弹性分布式数据集(RDD), 它是跨集群节点的元素集合, 能被并发操作.RDD是从Hadoop文件系统(或者其他Hadoop支持的文件系统)或者驱动程序中现有的Scala集合创建的, 并对其进行转换.用户也可以让spark缓存RDD到内存, 让它可以在并发操作的时候高效地重用.最后, RDD会在节点失败时自动恢复.

spark中第二个抽象是能被并发操作使用共享变量 .默认情况下, 当spark以任务集的方式在不同节点上执行函数时, 它会将函数需要的每个变量的副本发送给每个任务.有时候, 一个变量需要在任务之间共享, 或者在任务和驱动程序之间共享.spark支持两种类型的共享变量:广播变量,可用于在所有节点上缓存内存中的值;累加器,它们是只用于“添加”的变量,例如计数器和求和器.

Initializing Spark

第一件要做的事,就是创建一个JavaSparkContext对象, 它告诉了spark如何访问一个群集.为了创建JavaSparkContext, 你首先需要创建一个SparkConf对象.

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName参数是应用名称, 它会在UI界面上显示.master是集群的主节点地址, 使用local指定在本地执行.实际上运行在集群中的时候, 不需要硬编码master地址, 而是将应用打包, 通过spark-submit命令传到集群执行.通常设置这个参数是为了使用local模式进行测试.

Resilient Distributed Datasets (RDDs)

Spark围绕着一个弹性分布式数据集(RDD)的概念展开,RDD是一个可以并行操作的容错元素集合。创建RDD有两种方法:并行化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或提供Hadoop输入格式的任何数据源。

Parallelized Collections

并行集合是通过在驱动程序程序中的现有集合上调用javasParallelize方法来创建的。复制集合的元素以形成可并行操作的分布式数据集。例如,下面介绍如何创建一个包含数字1到5的并行集合:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

一旦创建,分布式数据集(distdata)就可以并行操作。例如,我们可以调用distdata.reduce((a,b)->a+b)来累加列表中的元素。稍后我们将描述分布式数据集上的操作。

并行集合的一个重要参数是分区数以将数据集剪切到其中。spark将为集群的每个分区运行一个任务。通常,你需要为集群中的每个CPU分配2-4个分区。通常,spark会根据集群自动设置分区数。但是,你也可以通过将它作为第二个参数传递给parallelize(例如sc.parallelize(data,10))来手动设置它。注意:代码中的某些地方使用术语slices(分区的同义词)来保持向后兼容性。

External Datasets

Spark可以从Hadoop支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark支持文本文件、序列文件和任何其他Hadoop输入格式。

可以使用sparkContexttext file方法创建文本文件RDD。此方法使用URI(计算机上的本地路径或hdfs:/s3a:/等URI)访问文件,并将其作为行集合读取。下面是一个调用示例:

JavaRDD<String> distFile = sc.textFile("data.txt");

一旦创建,distfile就可以通过数据集操作进行操作。例如,我们可以使用mapreduce操作,将所有行的大小相加,如下所示:distfile.map(s->s.length()).reduce((a,b)->a+b)

在spark中读取文件一些注意事项:

  • 如果路径在本地文件系统上,则该文件也必须可以在工作节点上的同一路径上访问。将文件复制到所有工作人员或使用网络共享文件系统。

  • Spark所有基于文件的输入方法,包括‘textfile’,都支持在目录、压缩文件和通配符上运行。例如,可以使用textfile(“/my/directory”)textfile(“/my/directory/*.txt”)textfile(“/my/directory/*.gz”)

  • textfile方法还接受一个可选的第二个参数,用于控制文件的分区数。默认情况下,spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但你也可以通过传递较大的值来请求更多的分区。请注意,分区不能少于块。

除了文本文件之外,spark的Java API还支持其他几种数据格式:

  • 对于序列化文件, 使用SparkContext的sequenceFile[K, V]方法, KV是文件中key和values的类型.它们必须是Hadoop的Writeable接口的子类, 例如IntWriteableText.

  • 对于其他Hadoop输入格式,可以使用JavaSparkContext.hadoopRDD方法,该方法采用任意JobConf和输入格式类、key类和value类。对于具有输入源的Hadoop作业, 使用相同的设置。当输入格式是基于新的MapReduce API (org.apache.hadoop.mapreduce)时, 你也可以使用 JavaSparkContext.newAPIHadoopRDD .

  • JavaRDD.saveAsObjectFile 和JavaSparkContext.objectFile支持以简单序列化的java对象格式保存RDD.虽然这不如像avro这样的专门格式有效,但它提供了一种保存任何RDD的简单方法。

RDD Operations

RDDs 有两种操作类型: - transformations: 使用原有数据集创建新数据集 - actions: 操作数据集之后返回结果给驱动程序(driver program) 例如map是一个transformations操作, 它将原数据集中每个元素转换后生成新的数据集. 例如reduce是一个action操作, 它会通过某个函数聚合数据集中所有的元素, 并将最终结果返回给驱动程序.

所有的transformations操作都是惰性(Lazy)的, 也就是说不会立即计算.spark仅仅只是记录这些操作, 这些操作计算的时机是当执行一个action返回结果给驱动程序时.这种推迟计算的机制让spark更具效率, 比如map操作后跟随一个reduce操作, 这样就会返回一个简单的结果给驱动程序, 而不是返回map操作之后的大数据集.

默认情况下, 每个transformation操作在每次运行action的时候都会被重复计算, 但是你可以将RDD持久化到内存中(使用persist或者cache方法), 这样spark会在群集中保留元素,以便下次访问时更快速.spark同样也支持保存RDD到硬盘或者在多个节点中复制.

示例

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行使用外部文件定义了一个基本RDD, 这个数据集没有加载到内存, lines变量仅仅指向文件. 第二行定义了lineLengths作为map操作的结果, 它是一个新的数据集, 当然并没有立即计算出来. 第三行我们运行了reduce操作, 它是一个action, 这时候spark开始在不同的机器上执行计算, 并汇总数据到驱动程序.

如果我们想要再次使用lineLengths, 我们可以在reduce之前添加

lineLengths.persist(StorageLevel.MEMORY_ONLY());

这样可以将lineLengths缓存到内存中, 在它第一次被计算的时候.

Passing Functions to Spark

传递函数给spark spark的api非常依赖从驱动程序传递函数给集群.在Java中, 函数指实现了org.apache.spark.api.java.function包中接口的类.这里有两种方法创建这些类: - 在自定义类中实现这些接口, 不论是命名或者匿名的类, 然后将类的实例传递给spark. - 使用lambda表达式创建函数 示例:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

注意, 在Java中的匿名类可以访问作用域中的final变量.spark会把这些变量的拷备复制到每个工作节点上.

理解作用范围

spark中比较困难的一件事是理解变量和方法的作用范围与生命周期, 当在群集中执行代码的时候. 常见的一种错误是RDD在它们的范围外修改变量.下面是一个使用foreach()来增加一个counter的值的例子.

考虑下面的求和代码, 根据是否在同一个JVM上执行, 结果会不一样. 一个常用的例子是, 这段代码运行在本地模式'local'和运行在集群模式'cluster'时, 返回的结果是不一样的.

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

Local vs. cluster modes

上面的代码行为是未知的, 可能并不按照你的想法工作.为了执行jobs, spark将RDD的操作切分成tasks, 每个task被一个executor执行.在执行前, spark计算task的闭包(closure). 闭包是指executor在RDD上执行运算时必需看见的变量与方法(示例中的foreach()).这种闭包被序列化并发送到每个executor.

闭包里的变量通过复制发送到每个executor, 因此被'foreach'函数引用的counter变量,不再指向驱动程序中的counter变量.驱动程序中仍然有一个counter变量,但它对于executors是不可见的, executor只看到复制后的变量.因此, final变量counter仍然是0, 因为所有对counter的操作都是引用了被序列化于闭包中的那个.

在本地模式运行的时候, 在某些情况下, 'foreach'运行在相同的JVM中, 这时候访问的变量就是原来的那个, 那么counter才会被更新.

为了正确实现上面这类需求, 一种方法是使用Accumulator. spark中的Accumulators提供了一种安全更新变量的机制, 当执行被切分到集群执行的时候.

通常情况下, 闭包 - 类似于循环或者本地方法的结构, 不应该修改全局的状态. spark没有定义或者确定对闭包外部对像的修改行为.有些代码这样做可能在本地执行时能正常工作, 但在分布式环境下就失效了.需要全局联合统计的时候,请使用accumulator.

Printing elements of an RDD

打印RDD中的元素 另外一种常用的写法就是尝试去使用'rdd.foreach(println)'或者'rdd.map(println)'来打印RDD中的元素.但是在cluster模式下, 写到stdout中的数据会被写到executor的stdout管道中, 并不是驱动程序的stdout, 所以这些元素的打印就看不到了.如果需要在驱动程序端打印所有的元素, 可以使用collect()方法, 这个方法可以将RDD中的数据传输到驱动程序端, 类似这样rdd.collect().foreach(println), 因为collect()会把RDD中的所有数据传过来, 这太大了, 我们只需要打印一小部分时, 可以使用 rdd.take(100).foreach(println).

Working with Key-Value Pairs

大部分spark的RDD操作可以使用任意类型, 其中一些仅用在key-value对的RDD上.最常用的一个操作是shuffle, 它用于根据key聚合元素.

在java中, key-value对是用来自Scala标准库的scala.Tuple2类实现的, 你可以简单地调用new Tuple2(a,b)来创建一个键值对, 然后使用tuple._1()tuple._2()来访问它们.

key-value的RDDs是使用JavaPairRDD类实现的.你可以通过特定类型的map操作(像mapToPairflatMapToPair)从JavaRDDs创建JavaPairRDDs.JavaPairRDD拥有通用RDD操作和特定的key-value操作.

举个例子, 下面的代码使用reduceByKey操作来计算文件中每一行出现的次数:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

我们也可以使用 counts.sortByKey(), 比如按字典序对键值对排序, 然后使用counts.collect()把数据传回驱动程序端.

注意: 当在键值对中使用自定义对象时, 一定要确保equals()函数和hashCode()函数保持一致.

Transformations

The following table lists some of the common transformations supported by Spark. 下面的表格是spark支持的常用transformations操作

| Transformation | Meaning | |--|--| | map(func) | 通过func对每个元素进行转换, 返回新的分布式数据集| | filter(func) | 通过func对每个元素进行测试, 将结果为true的元素放到新的数据集并返回| | flatMap(func) | 类似于map操作, 不同的是每个元素可以映射到0~N个输出结果(所以func返回的是一个序列而不是单个元素)| | mapPartitions(func) | 类似于map操作,但是在运行时候是按照RDD每个分区去执行的,一次把分区中所有的数据传入到func中, 所以func需要使用这样的类型Iterator => Iterator, T是原类型, U是新类型, 如果内存不够的话, 这个操作有可能OOM| | mapPartitionsWithIndex(func) | 类似于mapPartitions操作, 但func多了一个整形参数表示分区的索引值, 所以func需要使用这样的类型(Int, Iterator) => Iterator | | sample(withReplacementfractionseed) |对数据进入采样,withReplacement表示元素是否可以重复参与采样, fraction是采用率,取值为[0,1], seed是随机种子| | union(otherDataset) |返回两个数据集的并集| | intersection(otherDataset) | 返回两个数据集的交集 | | distinct([numPartitions])) | 返回数据集去重后的数据, numPartitions指定任务数 | | groupByKey([numPartitions]) | 对键值对数据集进行聚合 (K,V)=>(K,Interable). 注意如果你是为了对每个key求聚合结果(比如求和或者平均值), 使用reduceByKey 或 aggregateByKey会更加高效.默认情况下,并发级别依赖RDD的分区数,但可以通过numPartitions指定 | | reduceByKey(func, [numPartitions]) | 对健值对进行按Key聚合 (K,V)=>(K,U), U是对每个key的值通过func聚合后的结果, 这里U的类型和V的类型是一样的 | | aggregateByKey(zeroValue)(seqOpcombOp, [numPartitions]) | 对键值对按Key进行聚合(K,V)=>(K,U),这里U的类型和V的类型可以不一样, zeroValue是初始值, seqOp是(U,V)=>U的变换, combOp是(U,U)=>U的变换| | sortByKey([ascending], [numPartitions]) | 按Key排序,ascending指定是否升序| | join(otherDataset, [numPartitions]) | 合并数据集 (K, V) 和 (K, W) => (K, (V, W)) ,类似的join操作有leftOuterJoinrightOuterJoin 和 fullOuterJoin. | | cogroup(otherDataset, [numPartitions]) | 合并数据集并按Key聚合 (K, V) , (K, W) => (K, (Iterable, Iterable)), 这个操作也可以用groupWith. | | cartesian(otherDataset) | 按笛卡尔积的方式合并数据集 T 和 U, 返回键值对数据集 (T, U). | | pipe(command[envVars]) | 对RDD每个分区执行shell命令或者Perl脚本, RDD中的元素从stdin输入, 脚本执行后通过stdout作为strings输出到RDD,并返回 | | coalesce(numPartitions) | 减少RDD的分区数到numPartitions, 通常是在filter操作于大数据集之后比较有效 | | repartition(numPartitions) | 将RDD中的数据重新分区, 以平衡负载 | | repartitionAndSortWithinPartitions(partitioner) |将RDD中的数据重新分区并排序 |

Actions

下面的列表显示了spark中常用的action. | Action | Meaning | |--|--| | reduce(func) | 聚合数据集中的元素的结果, 通过func函数(将两个元素合并为一个), func要满足交换率和结合率 | | collect() | 返回数据集中的所有元素到驱动程序 | | count() | 返回数据集中元素的总数 | | first() | 返回数据集中第一个元素,等效于take(1)| | take(n) | 返回数据集中前n个元素| | takeSample(withReplacementnum, [seed]) |采样后返回数据集中前num个元素 | | takeOrdered(n[ordering]) | 取数据集中前n个元素, 对它们进行排序后返回 | | saveAsTextFile(path) | 将数据集中的元素写入到一个文本文件, 这个文本文件可以存放在本地文件系统,HDFS或者其他Hadoop支持的文件系统, spark是调用元素的toString方法来转换元素为text,并放到文件中一行里| | saveAsSequenceFile(path)  | 将数据集中的元素保存到本地文件系统/HDFS或其他支持Hadoop文件系统目录下, 作为一个Hadoop SequenceFile, 这个操作只对实现了Hadoop的Writeable接口的键值对数据集有效(或者键值对是基本数据类型, spark默认实现隐式转换)| | saveAsObjectFile(path) | 将元素以序列化的方式保存到文件, 可以通过SparkContext.objectFile()把数据加载回来 | | countByKey() | 仅对键值对RDD有效, 按Key计算值的个数, 返回hashmap| | foreach(func) | 对每个元素执行func操作, 通常用来更新一个Accumulator, 无返回值|

spark RDD API 的某些操作有异步版本, 像foreach的异步版本foreachAsync, 立即返回一个FutureAction给调用者, 而不是阻塞在这里.

Shuffle operations

一些spark的操作会触发shuffle事件, shuffle是spark的一种机制, 用于重新按组分配数据到不同的分区上.这通常影响到在executor和机器之间复制数据, 这使得shuffle是一个复杂和耗时的操作.

Background

为了弄明白shuffle阶段发生了什么, 我们来看一个reduceByKey操作的例子. reduceByKey操作产生一个新的RDD, 一个key和它所有的value被合并为一个tuple, value是通过func进行合并的. 这里的困难在于, key的所有value不一定在同一个分区中, 甚至不是同一个机器, 但是它们必须位于同一位置才能计算结果.

在spark中, 数据通常不分布在各分区间, 而是分布在特定操作需要的位置. 在计算的时候, 单个任务在单个分区上面执行--因此, 为了组织reduceByKey单个任务需要的数据, spark需要执行一个all-to-all的操作.这个操作需要读取所有分区来查找所有keys的所有value, 然后把相应key的所有value放到一起来计算最终结果, 这就是shuffle.

虽然shuffle产生的数据每个分区中的元素都是确定的, 分区本身的顺序也是确定的, 但元素的顺序不是.如果希望suffle之后的元素顺序是确定的, 那么可以使用:

  • 使用mapPartitions来排序每个分区的数据, 例如.sorted
  • 使用repartitionAndSortWithinPartitions在重新分区的同时高效地进行排序.
  • 使用sortBy创建全局有序的RDD

触发shuffle事件的操作包括: - repartition操作, 如repartition - ByKey操作, 如groupByKey, reduceByKey - join操作, 如cogroup, join

Performance Impact

Shuffle是一个很昂贵的操作, 因为它影响磁盘IO,数据序列化, 网络IO. 为了组织数据进行shuffle, spark产生许多的map任务来组织数据, 同样产生许多reduce任务来聚合数据.这种命名来自MapReduce, 并不直接和spark的map,reduce操作相关.

单独的map任务的结果保持在内存中,直到内存不够用为止.然后, 跟据目标分区对它们进行排序, 写到单个文件中.在reduce端, 任务读取相关的排序好的数据块.

某些shuffle操作会显箸消耗堆内存, 因为它们在传输数据前后使用内存数据结构来组织数据.具体来说,  reduceByKey 和 aggregateByKey在map端创建他们的结构, 而ByKey操作在reduce端创建结构.当内存不能满足数据大小时, spark会将数据写入到磁盘中, 这会带来额外的磁盘IO和垃圾回收操作.

Shuffle会在磁盘中生成大量的临时文件.从spark1.3开始, 这些文件会保留, 直到对应的RDD因不再被使用被垃圾回收时.这样做, 是为了重新计算的时候, 不需要重新创建shuffle文件.如果保留对这些RDD的引用或者GC并不经常执行, 垃圾回收可能会在很长时间之后.这意味着长时间运行的spark任务会消耗大量的磁盘空间.这个临时存储目录由spark.local.dir配置参数指定.

可以使用多种配置参数来调优Shuffle的行为.查看Spark Configuration Guide.

RDD Persistence

spark最重要功能之一, 是在多个操作之间, 将数据集到持久化(缓存)到内存. 当你持久化一个RDD时, 每个节点存储它们在内存中计算的分区数据, 并且在其他访问这些数据集(或者它们的派生数据集)的操作上重用这些数据.这可以让后面的操作更快(通常10x倍速度以上).缓存是迭代算法及快速交互使用的关键工具.

可以使用persist()或者cache()方法来缓存RDD.当RDD第一次被计算的时候, 它会保存到节点们的内存中.spark的缓存是故障容错的--如果RDD的任何分区丢失了, 它会被自动地使用原来的transformations计算出来.

另外, 每个RDD可以使用不同的存储级别来持久化, 比如, 持久化数据集到磁盘, 持久化到内存,复制到其他节点.这些级别是通过传递参数StorageLevel对象到persist()实现的.cache方法是一个使用默认存储级别(StorageLevel.MEMORY_ONLY)的快捷方法.下面是全部的存储级别:

| Storage Level | Meaning | |--|--| | MEMORY_ONLY | 默认级别, 将反序化后的RDD存储在JVM中.如果内存不够, 一些分区不会被缓存, 在需要的时候会被重新计算| | MEMORY_AND_DISK | 将反序化后的RDD存储在JVM中.如果内存不够, 将分区存放到磁盘中, 在需要的时候从磁盘读取出来| | MEMORY_ONLY_SER | 将序列化后的RDD存放到JVM中, 这通常能节省内存空间, 但是读取的时候会慢一些, 并且消耗掉一些CPU| | MEMORY_AND_DISK_SER | 类似于MEMORY_ONLY_SER,但是内存不够用的话, 多的数据会保存到磁盘 | | DISK_ONLY | 将分区存放到磁盘 | | MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 和上面一样, 但会复制数据到两个集群节点上| | OFF_HEAP (experimental) |类似于MEMORY_ONLY_SER, 但是数据保存到 off-heap内存中|

spark也会自动保存一些临时数据, 在做shuffle的时候(如reduceByKey), 即使用户没有调用persist.这样做是为了避免当shuffle操作节点失败的时候整个输入都要重新计算.仍然推荐在计划重用RDD时,手动调用persist.

Which Storage Level to Choose

spark的存储级别是为了在内存使用率和CPU效率之间提供不同的权衡. 我们推荐使用下面的流程来选择: - 如果你的RDD适合默认的存储级别(MEMORY_ONLY), 那就不要修改它.这是最有CPU效率的选项,让RDD上的操作执行得最快. - 如果RDD不适合, 尝试使用MEMORY_ONLY_SER, 并选择一个快速的序列化库, 这通常能节省空间, 速度也比较快. - 一般不要溢出数据到磁盘, 除非计算的数据非常耗时,或者数据是从非常大的数据集中过滤而来.一般而言, 重新计算数据和从磁盘中读取速度差不多. - 如果你需要快速的错误恢复(例如使用spark来服务于web应用), 可以使用复制数据级别.所有的存储级别都有容错性(通过重新计算), 但复制级别可以不需要等待重新计算就能继续执行任务.

Removing Data

spark自动监控每个节点上的缓存使用, 并使用LRU(最近最少使用)策略移除旧的数据分区.如果你想手动删除缓存, 使用RDD.unpersist()方法.

Shared Variables

通常情况下, 把一个函数传递给spark操作(像mapreduce)时是在远程集群节点上执行的, 它在函数使用的所有变量的独立副本上工作.这些变量被复制到每个机器, 并且没有更新会从远程机器传回驱动程序.支持在任务之间共享通用读写变量是低效的.但是,spark确实为两种常见的使用模式提供了两种有限类型的共享变量: broadcast variables 和 accumulators.

Broadcast Variables

广播变量允许开发者在每台机上保持一个只读变量, 而不是复制一份发给每个任务.例如, 它们可以为每个节点有效地提供一份大的数据集.spark还尝试使用更有效率的广播算法来分配广播变量, 以节省通信开销.

spark actions执行时被"shuffle"操作划分成多个阶段.spark自动广播每个阶段任务需要的共享数据.被广播的数据是以序列化的方式缓存的, 并且在每个任务运行之间进行反序列化.这意味着, 显示创建广播变量只有在任务跨越多个阶段使用同样的数据时,或者以反序列化存放数据非常重要时, 才有效果.

使用变量v创建广播变量调用SparkContext.broadcast(v).广播变量被包装到v里面, 可以调用 value方法来访问它的值, 代码如下:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]

一旦广播变量被创建, 集群上运行的任何函数都要使用这个变量而不是v, 以避免v被多次传输到相同的节点.另外, 对象v在广播之后不应该被修改, 以保证所有的节点获取的是相同的值(例如变量稍后被传输到新的节点).

Accumulators

累加器是一种变量, 通过关联和交换操作实现的增加, 因此可以高效地支持并发.它们用来实现计数器(比如在MapReduce中)或者求和.spark本身支持数字类型的累加器, 同时开发者也可以自己实现对新类型的支持.

作为用户, 你可以创建命名或者匿名的累加器.像下面这个图片, 一个命令的累加器(这里叫counter)将会在web页面上显示不同阶段对数值的修改. spark在“tasks”表中显示由任务修改的每个累加器的值.

Accumulators in the Spark UI 在UI上跟踪累加器对于理解运行阶段非常有用(注意在Python上暂时不支持).

创建一个数值型的累加器, 可以调用SparkContext.longAccumulator()或者SparkContext.doubleAccumulator(), 来累加Long或者Double类型的累加值.然后运行在集群上的任务可以调用add方法来增加它.然而, 它们不能读取累加器的值.只有驱动程序可以读取累加器的值, 使用value方法.

下面的代码展示一个累加器用于累加数组中的所有元素

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

上面的代码是使用内建的累加器类型Long, 开发者可以通过继承AccumulatorV2类来创建自己的类型.虚基类AccumulatorV2有几个方法需要被重写: reset用于清除累加器的值到0, add用于累加另外一个值到累加器, merge用于合并相同类型累加器的值到一个.其他需要覆盖的方法在API文档中API documentation.举个例子, 我们有一个类MyVector实现了数据向量, 我们可以这样写:

class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");

注意, 当开发者定义自己的AccumulatorV2类型时, 生成的类型和添加的元素类型可能不一样.

对于仅在action操作中更新的累加器, spark保证累加只执行一次, 也就是说重启任务不会更新累加器的值.在transformation操作中, 用户需要明确, 如果任务或者工作阶段重新执行的话, 每个任务的更新会被应用多次.

累加器并没有改变spark的惰性模型.如果累加器在RDD的操作中被更新, 它们的值只在RDD被计算时更新一次.因此, 在像map()之类的惰性转换操作时, 累加器的更新不会被执行.下面的代码演示了这一特性:

LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.