存在的问题

Google所面临的问题:大数据处理任务庞大,如何通过分布式系统完成并行计算、分发数据、处理错误?

为了解决这个问题,需要设计一个新的抽象模型,用来表述我们需要执行的计算。不用关心底层的实现细节,包括并行计算、容错、数据分布、负载均衡等方面。

编程模型

MapReduce编程模型的原理:利用一个输入的key/value pair集合来产生一个输出的key/value pair集合。

自定义的Map函数接受一个key/value pair输入,然后产生一个中间key/value pair集合。会将相同key和对应多个value值集合在一起传递给reduce函数。

自定义的Reduce函数接受上面的集合,合并这些value值,形成一个新的value值集合。

下面是一个计算大文档集合中每个单词出现的次数的案例:

map(String key, String value):
	//key : document name
	//value: document contents
	foreach word w in value:
		emitintermediate(w, "1");
		
reduce(String key, Iterator values):
	//key : a word
	//values: a list of counts
	int result = 0;
	foreach v in values:
		result += ParseInt(v);
	emit(AsString(result));

Map计函数计算文档中(word,count)这样的key/value pair,Reduce函数把Map函数产生的计数累加起来。

Map Reduce函数可以抽象成以下形式:

map(k1, v1) -> list(k2, v2)
reduce(k2, list(v2)) -> list(v2)

其他案例

  • 分布式的grep:Map输出匹配某个模式的一行,Reduce将中间数据处理再输出。
  • 计算URL访问频率:Map处理日志,输出(URL, 1),Reduce将中间数据累加处理,产生(URL, totalCount)再输出。

实现

将输入数据自动分割成M块数据片段,Map函数在多台机器上并行处理。Reduce调用也是在多台机器上并行处理。MapReduce实现的全部流程如图所示:

master数据结构

Master存储每个Map和Reduce任务的状态,以及Worker机器的标识。

Master像一个数据管道,存储已完成的Map任务的计算结果。并将这些信息推送给Reduce任务。

容错

  • worker故障
    • master会对worker进行心跳检测,如果在指定时间内没有收到返回信息,会将worker标记为失效。
    • 所有由这个失效worker完成的Map任务需要重新分配给其他的worker。
    • 当这个Map任务被调度到worker B执行时,会通知执行Reduce任务的worker从这台机器上读取数据。
  • master故障
    • 让master定期的将上述的数据结构写入磁盘,如果master出现故障,可以让新的master从这个checkpoint恢复。
    • 由于只有一个master进程,如果master失效,需要中止MapReduce运算。

存储位置

MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在有数据拷贝的机器上执行。这样可以减少网络带宽的消耗。

任务粒度

Map拆分成 M个片段,Reduce拆分成R个片段执行。

但是实际上对M和R的取值有一定的客观限制:

  • master必须执行$O(M + R)$次调度
  • 需要在内存中保存$O(M * R)$个状态

R由用户指定的,实际使用时需要选择合适的M,以使得每一个独立任务都处理大约16M~64M的输入数据。R值应该设置为worker机器数量的倍数。

备用任务

如果一个worker花了很长时间才完成最后几个Map或Reduce任务,会导致MapReduce操作总的执行时间超过预期。产生以上现象的原因有很多。

因此需要使用一个通用的机制来减少这种现象,当一个MapReduce操作接近完成的时候,master调度备用任务进程来处理剩下的任务。

改良的功能

分区函数

选择什么函数来进行对数据进行分区?

hash(key) mod R能产生非常平衡的分区,但是,其它分区函数对一些key的分区效果较好,例如输入的key值是URL,我们希望每个主机的所有条目保持在同一个输出文件中。

顺序保证

需要保证Reduce的生成顺序。

Combiner函数

某些情况下,Map函数产生的中间key值的重复数据会占很大的比重,因此,允许用户指定一个可选的combiner函数,该函数首先在本地将重复数据进行一次合并,然后再通过网络发送出去。

输入和输出的类型

可以预定义输入/输出的数据类型。

跳过损坏的记录

有时,用户程序中的Bug会导致Map/Reduce函数在处理记录的时候发生crash,常用的做法是修复Bug后再执行MapReduce操作。

但是很多时候,忽略一些有问题的记录是可以接受的,因此提供了一种执行模式,MapReduce会检测哪些记录导致了crash,并且会跳过这些记录不做处理。

每个worker进程都设置了信号处理函数来捕获内存段错误和总线错误。在执行MapReduce之前,通过全局变量保存记录序号。如果用户程序触发了一个系统信号,消息处理函数会通过UDP向master发送处理的最后一条记录的序号,当master看到处理某条记录失败多次后,master标记此条记录将不会被处理。

本地运行

需要本地进行debug来测试。

状态信息

master可以使用HTTP服务器来显示状态信息,用户可以监控各类信息。包括任务完成数量、任务处理数量、执行时间、资源占用等。

计数器

MapReduce提供计数功能,需要用户在程序中创建一个计数器对象,在Map/Reduce函数中相应的增加计数器的值。

计数器机制对于MapReduce操作的完整性检查十分有用。某些时候,用户需要确保输出的key/value pair精确等于输入的key/value pair