Spark 使用共享变量

为什么需要共享变量? 因为Spark是Share Nothing 架构,细节上,出现2个不同的问题:
1. spark应用程序(executor program)中定义的变量,会复制到所有计算节点上(请考虑持有的数据是很大量的情况);
2. 数据的计算是单向的,而且各计算节点的并行计算,不共享任何变量。

官方文档中的Shared Variables部分,指出Spark支持2种所谓的共享变量(全局变量): Broadcast Variables 和 Accumulator。
简单总结:前者是只读的,后者是只写的;前者解决第一个问题,后者解决第二个问题。

Broadcast Variables

主要出于性能优化。如:

val largeArray: Array[Int] = Array.fill(1e8){scala.util.Random.nextInt(10)} 

val data = sc.parallelize(Array(1,2,3,4))
data.map(largeArray.contains(_))

就不如以下性能好:

val largeArray: Array[Int] = Array.fill(1e8){scala.util.Random.nextInt(10)}
val broadcasted = sc.broadcast(largeArray)

val data = sc.parallelize(Array(1,2,3,4))
data.map(broadcasted.contains(_))

注意:在sc.broadcast(largeData)之后,就不要再修改largeData

那怎么会在spark 应用程序中 hardcode 这些大数据集呢?

没错,大多数场景,不应该出现这个,因为可以用hive,cassandra等分布式数据库存储数据,应用程序分片读取。
需要这个功能,通常因为首先是大数据集是固定的,且对比了数据库读取的代价。

补充:有人为了减少shuffle(重分区)--spark性能调优基本上就围绕这个,使用了这个功能:
http://fdahms.com/2015/10/04/writing-efficient-spark-jobs/

Accumulator

顾名思义就是累加器,“累加”可以有丰富的语义(实现)。由此定义的变量是在各计算节点间共享的。最终在 spark 应用程序中可以读取。
还没用到过。大概在spark监控时需要用到。

Back to top

comments powered by Disqus