Spark无疑是非常广泛使用的大数据引擎了,前一段时间同事去挖了Spark最早release出来的版本给我们做了下介绍,突然觉得很有趣,确实代码一直都是有记忆的,最早版本的Spark其实一直在github上静静地躺着,无数后面的故事都是从这个初版的代码发展而来。

想学习Spark的基本概念其实从这个0.1版本的代码,搭配原始论文Spark: Cluster Computing with Working Sets一起来看,会发现一些核心的思路在这时候就已经基本都有了,同时又可以让我们很容易去理解Spark的一些设计思路。整个论文给我的感觉是很"实在",介绍了设计和实现的细节,有一种大作业写一个toy system的既视感,作者应该也没想到Spark会发展成现在的模样。

Motivation

Spark想解决一个问题,就是数据(在内存中)的复用性,作者发现在这种情况下MapReduce不是很高效

  • 迭代式任务:机器学习算法往往需要重复作用一个数据集去优化一个参数,每一次迭代都可以表示成MapReduce任务,都需要重新从磁盘加载数据。
  • 迭代式分析:用户很自然地需要加载数据集到内存中,多次对数据集做查询分析,但对于MapReduce任务而言,每次查询都是一次单独的MapReduce任务,需要重新从磁盘加载数据。

所以可以看到,其实上面两个问题的本质问题都是一个,而且看起来似乎很trivial,甚至有点疑惑为啥MapReduce不做这个优化,我的理解是MapReduce的主要目标还是在于解决scalability和fault tolerance两个问题,当时的系统都是disk-based的,而且事实证明,Spark想做的内存计算也并不是一件容易的事情。

编程模型

Spark提供了两种抽象,RDD(Resilient Distributed Dataset)和Parallel Operations(可以认为是作用在RDD上的计算)。此外Spark还提供两种特别的共享变量:广播变量和累加器。

RDD

RDD是一种只读的分布式数据对象,可以有四种构建RDD的方式

  • From a file,从文件系统里读取。
  • Parallelizing a scala collection,比如把scala的数组转化成几个分片分散到多台机器上。
  • Transforming an exsiting RDD,通过一些map操作做RDD转化。
  • Changing the persistence of an exsiting RDD 默认状态下RDD是lazy的,需要的时候会计算出来,然后在内存中被移除,但用户可以定义cache让它在内存中保存或者save持久化到文件系统上。

每个RDD都会记录自己是如何创建的,也就是血缘,帮助计算失败的场景下重新构建RDD。

每个RDD会实现下面几个方法

  • getPartitions,获取当前RDD的分区id列表。
  • getIterator(partition),遍历该分区下的数据。
  • getPreferredLocations(partition),用于任务调度实现数据局部性。

任务提交以后,每个任务会被发送到自己prefer的某个分区上,然后去遍历该分区的数据。

Parallel Operations

作用在RDD上的一些计算

  • reduce,把数据结果合并,不支持grouped reduce operation,比如group by或者join操作,reduce结果只能在driver上聚合。
  • collect,发送数据集中的所有数据到driver。
  • foreach,把每一个element经过一个用户定义的函数。

Shared variables

当Spark发送任务到worker节点的时候,本质是传递闭包,这些闭包包含了创建时可见的局部变量,这些变量也会拷贝到对应节点。为了易用性和性能方面需求,Spark提供两种类型的共享变量:

  • 广播变量,如果一个很大的数据在多个Parallel Operations里用到,最好就是在每个节点上有一份就好,而不是在每一个闭包里都打包进去,这样可以避免一台机器上有一份数据的多份拷贝,节省内存资源。
  • 累加器,这个变量是在worker和driver上都会有,worker只能按照某个操作往上累加,只有driver可以读,可以方便用来做求和运算。

用户创建一个广播变量以后,会被保存到分布式文件系统上面,序列化的部分仅仅是文件路径。

累加器在创建的时候会有一个唯一标识的ID,在每一个任务里面,累加器的一个副本会创建,同时将初始值设成0,最后把累加信息返回给driver,driver接收各分区的任务结果更新最终结果。

示例

来看一个逻辑回归的例子

val data = sc.textFile(...) // read data
var w = Vector.random(D)  // initialize w

for (i <- 1 to ITERATIONS) {
  val gradient = sc.accumulator(Vector.zeros(D))
  for (p <- sc.parallelize(data, numSlices)) { // run in parallel
    val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y
    gradient +=  scale * p.x
  }
}
w -= gradient.value

定义gradient为累加器,for循环会启动一个foreach任务,按照数据分片进行并行计算,最后将结果累加到gradient上。

未来工作

作者最后规划了下未来的工作,很厉害的是这几点规划恰好是Spark未来重点突破的一些方向,而且都做成功了。

  • 正式归纳RDD的特点,说明其适合的workloads和应用场景。
  • 提升RDD的抽象化能力,让开发人员可以自己做存储空间和重新构建RDD之间的权衡。
  • 支持Shuffle操作,帮助实现像group by和join一类的操作。
  • 提供更好的交互式能力,比如SQL能力的支持

Reference