关闭

关闭

封号提示

内容

首页 mapreduce中文版.pdf

mapreduce中文版.pdf

mapreduce中文版.pdf

上传者: icefireelf 2012-01-25 评分 4.5 0 76 10 344 暂无简介 简介 举报

简介:本文档为《mapreduce中文版pdf》,可适用于IT/计算机领域,主题内容包含MapReduce:超大机群上的简单数据处理摘要MapReduce是一个编程模型,和处理,产生大数据集的相关实现用户指定一个map函数处理一个key符等。

MapReduce:超大机群上的简单数据处理摘要MapReduce是一个编程模型,和处理,产生大数据集的相关实现用户指定一个map函数处理一个keyvalue对,从而产生中间的keyvalue对集然后再指定一个reduce函数合并所有的具有相同中间key的中间value下面将列举许多可以用这个模型来表示的现实世界的工作以这种方式写的程序能自动的在大规模的普通机器上实现并行化这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通信这样就可以让那些没有并行分布式处理系统经验的程序员利用大量分布式系统的资源我们的MapReduce实现运行在规模可以灵活调整的由普通机器组成的机群上,一个典型的MapReduce计算处理几千台机器上的以TB计算的数据程序员发现这个系统非常好用:已经实现了数以百计的MapReduce程序,每天在Google的机群上都有多个MapReduce程序在执行介绍在过去的年里,作者和Google的许多人已经实现了数以百计的为专门目的而写的计算来处理大量的原始数据,比如,爬行的文档,Web请求日志,等等为了计算各种类型的派生数据,比如,倒排索引,Web文档的图结构的各种表示,每个主机上爬行的页面数量的概要,每天被请求数量最多的集合,等等很多这样的计算在概念上很容易理解然而,输入的数据量很大,并且只有计算被分布在成百上千的机器上才能在可以接受的时间内完成怎样并行计算,分发数据,处理错误,所有这些问题综合在一起,使得原本很简介的计算,因为要大量的复杂代码来处理这些问题,而变得让人难以处理作为对这个复杂性的回应,我们设计一个新的抽象模型,它让我们表示我们将要执行的简单计算,而隐藏并行化,容错,数据分布,负载均衡的那些杂乱的细节,在一个库里我们的抽象模型的灵感来自Lisp和许多其他函数语言的map和reduce的原始表示我们认识到我们的许多计算都包含这样的操作:在我们输入数据的逻辑记录上应用map操作,来计算出一个中间keyvalue对集,在所有具有相同key的value上应用reduce操作,来适当的合并派生的数据功能模型的使用,再结合用户指定的map和reduce操作,让我们可以非常容易的实现大规模并行化计算,和使用再次执行作为初级机制来实现容错这个工作的主要贡献是通过简单有力的接口来实现自动的并行化和大规模分布式计算,结合这个接口的实现来在大量普通的PC机上实现高性能计算第二部分描述基本的编程模型,并且给一些例子第三部分描述符合我们的基于集群的计算环境的MapReduce的接口的实现第四部分描述我们觉得编程模型中一些有用的技巧第五部分对于各种不同的任务,测量我们实现的性能第六部分探究在Google内部使用MapReduce作为基础来重写我们的索引系统产品第七部分讨论相关的,和未来的工作编程模型计算利用一个输入keyvalue对集,来产生一个输出keyvalue对集MapReduce库的用户用两个函数表达这个计算:map和reduce用户自定义的map函数,接受一个输入对,然后产生一个中间keyvalue对集MapReduce库把所有具有相同中间keyI的中间value聚合在一起,然后把它们传递给reduce函数用户自定义的reduce函数,接受一个中间keyI和相关的一个value集它合并这些value,形成一个比较小的value集一般的,每次reduce调用只产生或个输出value通过一个迭代器把中间value提供给用户自定义的reduce函数这样可以使我们根据内存来控制value列表的大小实例考虑这个问题:计算在一个大的文档集合中每个词出现的次数用户将写和下面类似的伪代码:map(Stringkey,Stringvalue):key:文档的名字value:文档的内容foreachwordwinvalue:EmitIntermediate(w,"")reduce(Stringkey,Iteratorvalues):key:一个词values:一个计数列表intresult=foreachvinvalues:result=ParseInt(v)Emit(AsString(resut))map函数产生每个词和这个词的出现次数(在这个简单的例子里就是)reduce函数把产生的每一个特定的词的计数加在一起另外,用户用输入输出文件的名字和可选的调节参数来填充一个mapreduce规范对象用户然后调用MapReduce函数,并把规范对象传递给它用户的代码和MapReduce库链接在一起(用C实现)附录A包含这个实例的全部文本类型即使前面的伪代码写成了字符串输入和输出的term格式,但是概念上用户写的map和reduce函数有关联的类型:map(k,v)>list(k,v)reduce(k,list(v))>list(v)例如,输入的key,value和输出的key,value的域不同此外,中间key,value和输出key,values的域相同我们的C实现传递字符串来和用户自定义的函数交互,并把它留给用户的代码,来在字符串和适当的类型间进行转换更多实例这里有一些让人感兴趣的简单程序,可以容易的用MapReduce计算来表示分布式的Grep(UNIX工具程序,可做文件内的字符串查找):如果输入行匹配给定的样式,map函数就输出这一行reduce函数就是把中间数据复制到输出计算URL访问频率:map函数处理web页面请求的记录,输出(URL,)reduce函数把相同URL的value都加起来,产生一个(URL,记录总数)的对倒转网络链接图:map函数为每个链接输出(目标,源)对,一个URL叫做目标,包含这个URL的页面叫做源reduce函数根据给定的相关目标URLs连接所有的源URLs形成一个列表,产生(目标,源列表)对每个主机的术语向量:一个术语向量用一个(词,频率)列表来概述出现在一个文档或一个文档集中的最重要的一些词map函数为每一个输入文档产生一个(主机名,术语向量)对(主机名来自文档的URL)reduce函数接收给定主机的所有文档的术语向量它把这些术语向量加在一起,丢弃低频的术语,然后产生一个最终的(主机名,术语向量)对倒排索引:map函数分析每个文档,然后产生一个(词,文档号)对的序列reduce函数接受一个给定词的所有对,排序相应的文档IDs,并且产生一个(词,文档ID列表)对所有的输出对集形成一个简单的倒排索引它可以简单的增加跟踪词位置的计算分布式排序:map函数从每个记录提取key,并且产生一个(key,record)对reduce函数不改变任何的对这个计算依赖分割工具(在描述)和排序属性(在描述)实现MapReduce接口可能有许多不同的实现根据环境进行正确的选择例如,一个实现对一个共享内存较小的机器是合适的,另外的适合一个大NUMA的多处理器的机器,而有的适合一个更大的网络机器的集合这部分描述一个在Google广泛使用的计算环境的实现:用交换机连接的普通PC机的大机群我们的环境是:Linux操作系统,双处理器,GB内存的机器普通的网络硬件,每个机器的带宽或者是百兆或者千兆,但是平均小于全部带宽的一半因为一个机群包含成百上千的机器,所有机器会经常出现问题存储用直接连到每个机器上的廉价IDE硬盘一个从内部文件系统发展起来的分布式文件系统被用来管理存储在这些磁盘上的数据文件系统用复制的方式在不可靠的硬件上来保证可靠性和有效性用户提交工作给调度系统每个工作包含一个任务集,每个工作被调度者映射到机群中一个可用的机器集上执行预览通过自动分割输入数据成一个有M个split的集,map调用被分布到多台机器上输入的split能够在不同的机器上被并行处理通过用分割函数分割中间key,来形成R个片(例如,hash(key)modR),reduce调用被分布到多台机器上分割数量(R)和分割函数由用户来指定图显示了我们实现的MapReduce操作的全部流程当用户的程序调用MapReduce的函数的时候,将发生下面的一系列动作(下面的数字和图中的数字标签相对应):在用户程序里的MapReduce库首先分割输入文件成M个片,每个片的大小一般从到MB(用户可以通过可选的参数来控制)然后在机群中开始大量的拷贝程序这些程序拷贝中的一个是master,其他的都是由master分配任务的worker有M个map任务和R个reduce任务将被分配管理者分配一个map任务或reduce任务给一个空闲的worker一个被分配了map任务的worker读取相关输入split的内容它从输入数据中分析出keyvalue对,然后把keyvalue对传递给用户自定义的map函数由map函数产生的中间keyvalue对被缓存在内存中缓存在内存中的keyvalue对被周期性的写入到本地磁盘上,通过分割函数把它们写入R个区域在本地磁盘上的缓存对的位置被传送给master,master负责把这些位置传送给reduceworker当一个reduceworker得到master的位置通知的时候,它使用远程过程调用来从mapworker的磁盘上读取缓存的数据当reduceworker读取了所有的中间数据后,它通过排序使具有相同key的内容聚合在一起因为许多不同的key映射到相同的reduce任务,所以排序是必须的如果中间数据比内存还大,那么还需要一个外部排序reduceworker迭代排过序的中间数据,对于遇到的每一个唯一的中间key,它把key和相关的中间value集传递给用户自定义的reduce函数reduce函数的输出被添加到这个reduce分割的最终的输出文件中当所有的map和reduce任务都完成了,管理者唤醒用户程序在这个时候,在用户程序里的MapReduce调用返回到用户代码在成功完成之后,mapreduce执行的输出存放在R个输出文件中(每一个reduce任务产生一个由用户指定名字的文件)一般,用户不需要合并这R个输出文件成一个文件他们经常把这些文件当作一个输入传递给其他的MapReduce调用,或者在可以处理多个分割文件的分布式应用中使用他们master数据结构master保持一些数据结构它为每一个map和reduce任务存储它们的状态(空闲,工作中,完成),和worker机器(非空闲任务的机器)的标识master就像一个管道,通过它,中间文件区域的位置从map任务传递到reduce任务因此,对于每个完成的map任务,master存储由map任务产生的R个中间文件区域的大小和位置当map任务完成的时候,位置和大小的更新信息被接受这些信息被逐步增加的传递给那些正在工作的reduce任务容错因为MapReduce库被设计用来使用成百上千的机器来帮助处理非常大规模的数据,所以这个库必须要能很好的处理机器故障worker故障master周期性的ping每个worker如果master在一个确定的时间段内没有收到worker返回的信息,那么它将把这个worker标记成失效因为每一个由这个失效的worker完成的map任务被重新设置成它初始的空闲状态,所以它可以被安排给其他的worker同样的,每一个在失败的worker上正在运行的map或reduce任务,也被重新设置成空闲状态,并且将被重新调度在一个失败机器上已经完成的map任务将被再次执行,因为它的输出存储在它的磁盘上,所以不可访问已经完成的reduce任务将不会再次执行,因为它的输出存储在全局文件系统中当一个map任务首先被workerA执行之后,又被B执行了(因为A失效了),重新执行这个情况被通知给所有执行reduce任务的worker任何还没有从A读数据的reduce任务将从workerB读取数据MapReduce可以处理大规模worker失败的情况例如,在一个MapReduce操作期间,在正在运行的机群上进行网络维护引起台机器在几分钟内不可访问了,MapReducemaster只是简单的再次执行已经被不可访问的worker完成的工作,继续执行,最终完成这个MapReduce操作master失败可以很容易的让管理者周期的写入上面描述的数据结构的checkpoints如果这个master任务失效了,可以从上次最后一个checkpoint开始启动另一个master进程然而,因为只有一个master,所以它的失败是比较麻烦的,因此我们现在的实现是,如果master失败,就中止MapReduce计算客户可以检查这个状态,并且可以根据需要重新执行MapReduce操作在错误面前的处理机制当用户提供的map和reduce操作对它的输出值是确定的函数时,我们的分布式实现产生,和全部程序没有错误的顺序执行一样,相同的输出我们依赖对map和reduce任务的输出进行原子提交来完成这个性质每个工作中的任务把它的输出写到私有临时文件中一个reduce任务产生一个这样的文件,而一个map任务产生R个这样的文件(一个reduce任务对应一个文件)当一个map任务完成的时候,worker发送一个消息给master,在这个消息中包含这R个临时文件的名字如果master从一个已经完成的map任务再次收到一个完成的消息,它将忽略这个消息否则,它在master的数据结构里记录这R个文件的名字当一个reduce任务完成的时候,这个reduceworker原子的把临时文件重命名成最终的输出文件如果相同的reduce任务在多个机器上执行,多个重命名调用将被执行,并产生相同的输出文件我们依赖由底层文件系统提供的原子重命名操作来保证,最终的文件系统状态仅仅包含一个reduce任务产生的数据我们的map和reduce操作大部分都是确定的,并且我们的处理机制等价于一个顺序的执行的这个事实,使得程序员可以很容易的理解程序的行为当map或和reduce操作是不确定的时候,我们提供虽然比较弱但是合理的处理机制当在一个非确定操作的前面,一个reduce任务R的输出等价于一个非确定顺序程序执行产生的输出然而,一个不同的reduce任务R的输出也许符合一个不同的非确定顺序程序执行产生的输出考虑map任务M和reduce任务R,R的情况我们设定e(Ri)为已经提交的Ri的执行(有且仅有一个这样的执行)这个比较弱的语义出现,因为e(R)也许已经读取了由M的执行产生的输出,而e(R)也许已经读取了由M的不同执行产生的输出存储位置在我们的计算机环境里,网络带宽是一个相当缺乏的资源我们利用把输入数据(由GFS管理)存储在机器的本地磁盘上来保存网络带宽GFS把每个文件分成MB的一些块,然后每个块的几个拷贝存储在不同的机器上(一般是个拷贝)MapReduce的master考虑输入文件的位置信息,并且努力在一个包含相关输入数据的机器上安排一个map任务如果这样做失败了,它尝试在那个任务的输入数据的附近安排一个map任务(例如,分配到一个和包含输入数据块在一个switch里的worker机器上执行)当运行巨大的MapReduce操作在一个机群中的一部分机器上的时候,大部分输入数据在本地被读取,从而不消耗网络带宽任务粒度象上面描述的那样,我们细分map阶段成M个片,reduce阶段成R个片M和R应当比worker机器的数量大许多每个worker执行许多不同的工作来提高动态负载均衡,也可以加速从一个worker失效中的恢复,这个机器上的许多已经完成的map任务可以被分配到所有其他的worker机器上在我们的实现里,M和R的范围是有大小限制的,因为master必须做O(MR)次调度,并且保存O(M*R)个状态在内存中(这个因素使用的内存是很少的,在O(M*R)个状态片里,大约每个map任务reduce任务对使用一个字节的数据)此外,R经常被用户限制,因为每一个reduce任务最终都是一个独立的输出文件实际上,我们倾向于选择M,以便每一个单独的任务大概都是到MB的输入数据(以便上面描述的位置优化是最有效的),我们把R设置成我们希望使用的worker机器数量的小倍数我们经常执行MapReduce计算,在M=,R=,使用台工作者机器的情况下备用任务一个落后者是延长MapReduce操作时间的原因之一:一个机器花费一个异乎寻常地的长时间来完成最后的一些map或reduce任务中的一个有很多原因可能产生落后者例如,一个有坏磁盘的机器经常发生可以纠正的错误,这样就使读性能从MBs降低到MBs机群调度系统也许已经安排其他的任务在这个机器上,由于计算要使用CPU,内存,本地磁盘,网络带宽的原因,引起它执行MapReduce代码很慢我们最近遇到的一个问题是,一个在机器初始化时的Bug引起处理器缓存的失效:在一个被影响的机器上的计算性能有上百倍的影响我们有一个一般的机制来减轻这个落后者的问题当一个MapReduce操作将要完成的时候,master调度备用进程来执行那些剩下的还在执行的任务无论是原来的还是备用的执行完成了,工作都被标记成完成我们已经调整了这个机制,通常只会占用多几个百分点的机器资源我们发现这可以显著的减少完成大规模MapReduce操作的时间作为一个例子,将要在描述的排序程序,在关闭掉备用任务的情况下,要比有备用任务的情况下多花的时间技巧尽管简单的map和reduce函数的功能对于大多数需求是足够的了,但是我们开发了一些有用的扩充这些将在这个部分描述分割函数MapReduce用户指定reduce任务和reduce任务需要的输出文件的数量在中间key上使用分割函数,使数据分割后通过这些任务一个缺省的分割函数使用hash方法(例如,hash(key)modR)这个导致非常平衡的分割然后,有的时候,使用其他的key分割函数来分割数据有非常有用的例如,有时候,输出的key是URLs,并且我们希望每个主机的所有条目保持在同一个输出文件中为了支持像这样的情况,MapReduce库的用户可以提供专门的分割函数例如,使用"hash(Hostname(urlkey))modR"作为分割函数,使所有来自同一个主机的URLs保存在同一个输出文件中顺序保证我们保证在一个给定的分割里面,中间keyvalue对以key递增的顺序处理这个顺序保证可以使每个分割产出一个有序的输出文件,当输出文件的格式需要支持有效率的随机访问key的时候,或者对输出数据集再作排序的时候,就很容易combiner函数在某些情况下,允许中间结果key重复会占据相当的比重,并且用户定义的reduce函数满足结合律和交换律一个很好的例子就是在部分的词统计程序因为词频率倾向于一个zipf分布(齐夫分布),每个map任务将产生成百上千个这样的记录<the,>所有的这些计数将通过网络被传输到一个单独的reduce任务,然后由reduce函数加在一起产生一个数字我们允许用户指定一个可选的combiner函数,先在本地进行合并一下,然后再通过网络发送在每一个执行map任务的机器上combiner函数被执行一般的,相同的代码被用在combiner和reduce函数在combiner和reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出reduce函数的输出被保存最终输出文件里combiner函数的输出被写到中间文件里,然后被发送给reduce任务部分使用combiner可以显著的提高一些MapReduce操作的速度附录A包含一个使用combiner函数的例子输入输出类型MapReduce库支持以几种不同的格式读取输入数据例如,文本模式输入把每一行看作是一个keyvalue对key是文件的偏移量,value是那一行的内容其他普通的支持格式以key的顺序存储keyvalue对序列每一个输入类型的实现知道怎样把输入分割成对每个单独的map任务来说是有意义的(例如,文本模式的范围分割确保仅仅在每行的边界进行范围分割)虽然许多用户仅仅使用很少的预定意输入类型的一个,但是用户可以通过提供一个简单的reader接口来支持一个新的输入类型一个reader不必要从文件里读数据例如,我们可以很容易的定义它从数据库里读记录,或从内存中的数据结构读取副作用有的时候,MapReduce的用户发现在map操作或和reduce操作时产生辅助文件作为一个附加的输出是很方便的我们依靠应用程序写来使这个副作用成为原子的一般的,应用程序写一个临时文件,然后一旦这个文件全部产生完,就自动的被重命名对于单个任务产生的多个输出文件来说,我们没有提供其上的两阶段提交的原子操作支持因此,一个产生需要交叉文件连接的多个输出文件的任务,应该使确定性的任务不过这个限制在实际的工作中并不是一个问题跳过错误记录有的时候因为用户的代码里有bug,导致在某一个记录上map或reduce函数突然crash掉这样的bug使得MapReduce操作不能完成虽然一般是修复这个bug,但是有时候这是不现实的也许这个bug是在源代码不可得到的第三方库里有的时候也可以忽略一些记录,例如,当在一个大的数据集上进行统计分析我们提供一个可选的执行模式,在这个模式下,MapReduce库检测那些记录引起的crash,然后跳过那些记录,来继续执行程序每个worker程序安装一个信号处理器来获取内存段异常和总线错误在调用一个用户自定义的map或reduce操作之前,MapReduce库把记录的序列号存储在一个全局变量里如果用户代码产生一个信号,那个信号处理器就会发送一个包含序号的"lastgasp"UDP包给MapReduce的master当master不止一次看到同一个记录的时候,它就会指出,当相关的map或reduce任务再次执行的时候,这个记录应当被跳过本地执行调试在map或reduce函数中问题是很困难的,因为实际的计算发生在一个分布式的系统中,经常是有一个master动态的分配工作给几千台机器为了简化调试和测试,我们开发了一个可替换的实现,这个实现在本地执行所有的MapReduce操作用户可以控制执行,这样计算可以限制到特定的map任务上用户以一个标志调用他们的程序,然后可以容易的使用他们认为好用的任何调试和测试工具(例如,gdb)状态信息master运行一个HTTP服务器,并且可以输出一组状况页来供人们使用状态页显示计算进度,象多少个任务已经完成,多少个还在运行,输入的字节数,中间数据字节数,输出字节数,处理百分比,等等这个页也包含到标准错误的链接,和由每个任务产生的标准输出的链接用户可以根据这些数据预测计算需要花费的时间,和是否需要更多的资源当计算比预期的要慢很多的时候,这些页面也可以被用来判断是不是这样此外,最上面的状态页显示已经有多少个工作者失败了,和当它们失败的时候,那个map和reduce任务正在运行当试图诊断在用户代码里的bug时,这个信息也是有用的计数器MapReduce库提供一个计数器工具,来计算各种事件的发生次数例如,用户代码想要计算所有处理的词的个数,或者被索引的德文文档的数量为了使用这个工具,用户代码创建一个命名的计数器对象,然后在map或和reduce函数里适当的增加计数器例如:Counter*uppercaseuppercase=GetCounter("uppercase")map(Stringname,Stringcontents):foreachwordwincontents:if(IsCapitalized(w)):uppercase>Increment()EmitIntermediate(w,"")来自不同worker机器上的计数器值被周期性的传送给master(在ping回应里)master把来自成功的map和reduce任务的计数器值加起来,在MapReduce操作完成的时候,把它返回给用户代码当前计数器的值也被显示在master状态页里,以便人们可以查看实际的计算进度当计算计数器值的时候消除重复执行的影响,避免数据的累加(在备用任务的使用,和由于出错的重新执行,可以产生重复执行)有些计数器值被MapReduce库自动的维护,比如,被处理的输入keyvalue对的数量,和被产生的输出keyvalue对的数量用户发现计数器工具对于检查MapReduce操作的完整性很有用例如,在一些MapReduce操作中,用户代码也许想要确保输出对的数量完全等于输入对的数量,或者处理过的德文文档的数量是在全部被处理的文档数量中属于合理的范围性能在本节,我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能一个计算用来在一个大概TB的数据中查找特定的匹配串另一个计算排序大概TB的数据这两个程序代表了MapReduce的用户实现的真实的程序的一个大子集一类是,把数据从一种表示转化到另一种表示另一类是,从一个大的数据集中提取少量的关心的数据机群配置所有的程序在包含大概台机器的机群上执行机器的配置是:个G的IntelXeon超线程处理器,GB内存,两个GBIDE磁盘,一个千兆网卡这些机器部署在一个由两层的,树形交换网络中,在根节点上大概有到G的带宽所有这些机器都有相同的部署(对等部署),因此任意两点之间的来回时间小于毫秒在GB的内存里,大概有GB被用来运行在机群中其他的任务这个程序是在周末的下午开始执行的,这个时候CPU,磁盘,网络基本上是空闲的Grep这个Grep程序扫描大概^个,每个字节的记录,查找比较少的字符的查找串(这个查找串出现在个记录中)输入数据被分割成大概MB的片(M=),全部的输出存放在一个文件中(R=)图显示计算过程随时间变化的情况Y轴表示输入数据被扫描的速度随着更多的机群被分配给这个MapReduce计算,速度在逐步的提高,当有个worker的时候这个速度达到最高的GBs当map任务完成的时候,速度开始下降,在计算开始后秒,输入的速度降到这个计算持续的时间大概是秒这包括了前面大概一分钟的启动时间启动时间用来把程序传播到所有的机器上,等待GFS打开个输入文件,得到必要的位置优化信息排序这个sort程序排序^个记录,每个记录个字节(大概TB的数据)这个程序是模仿TeraSort的这个排序程序只包含不到行的用户代码其中有行map函数用来从文本行提取字节的排序key,并且产生一个由这个key和原始文本行组成的中间keyvalue对我们使用一个内置的Identity函数作为reduce操作这个函数直接把中间keyvalue对作为输出的keyvalue对最终的排序输出写到一个路复制的GFS文件中(也就是,程序的输出会写TB的数据)象以前一样,输入数据被分割成MB的片(M=)我们把排序后的输出写到个文件中(R=)分区函数使用key的原始字节来把数据分区到R个小片中我们以这个基准的分割函数,知道key的分布情况在一般的排序程序中,我们会增加一个预处理的MapReduce操作,这个操作用于采样key的情况,并且用这个采样的key的分布情况来计算对最终排序处理的分割点。图(a)显示这个排序程序的正常执行情况左上图显示输入数据的读取速度这个速度最高到达GBs,并且在不到秒所有map任务完成之后迅速滑落到注意到这个输入速度小于Grep这是因为这个排序map任务花费大概一半的时间和带宽,来把中间数据写到本地硬盘中而Grep相关的中间数据可以忽略不计左中图显示数据通过网络从map任务传输给reduce任务的速度当第一个map任务完成后,这个排序过程就开始了图示上的第一个高峰是启动了第一批大概个reduce任务(整个MapReduce任务被分配到台机器上,每个机器一次只执行一个reduce任务)大概开始计算后的秒,第一批reduce任务中的一些完成了,我们开始执行剩下的reduce任务全部的排序过程持续了大概秒的时间左下图显示排序后的数据被reduce任务写入最终文件的速度因为机器忙于排序中间数据,所以在第一个排序阶段的结束和写阶段的开始有一个延迟写的速度大概是GBs大概开始计算后的秒写过程结束包括前面的启动过程,全部的计算任务持续的秒这个和TeraSortbenchmark的最高纪录秒差不多需要注意的事情是:因此位置优化的原因,很多数据都是从本地磁盘读取的而没有通过我们有限带宽的网络,所以输入速度比排序速度和输出速度都要快排序速度比输出速度快的原因是输出阶段写两个排序后数据的拷贝(我们写两个副本的原因是为了可靠性和可用性)我们写两份的原因是因为底层文件系统的可靠性和可用性的要求如果底层文件系统用类似容错编码(erasurecoding)的方式,而不采用复制写的方式,在写盘阶段可以降低网络带宽的要求。备用任务的影响在图(b)中,显示我们不用备用任务的排序程序的执行情况除了它有一个很长的几乎没有写动作发生的尾巴外,执行流程和图(a)相似在秒后,只有个reduce任务没有完成然而,就是这最后几个落后者知道秒后才完成全部的计算任务执行了秒,多花了的时间机器失效在图(c)中,显示我们有意的在排序程序计算过程中停止台worker中的台机器上的程序的情况底层机群调度者在这些机器上马上重新开始新的worker程序(因为仅仅程序被停止,而机器仍然在正常运行)因为已经完成的map工作丢失了(由于相关的mapworker被杀掉了),需要重新再作所以worker死掉会导致一个负数的输入速率相关map任务的重新执行很快就重新执行了整个计算过程在秒内完成,包括了前边的启动时间(只比正常执行时间多了的时间)经验我们在年的月写了MapReduce库的第一个版本,并且在年的月做了显著的增强,包括位置优化,worker机器间任务执行的动态负载均衡,等等从那个时候起,我们惊奇的发现MapReduce函数库广泛用于我们日常处理的问题它现在在Google内部各个领域内广泛应用,包括:大规模机器学习问题GoogleNews和Froogle产品的机器问题提取数据产生一个流行查询的报告(例如,GoogleZeitgeist)为新的试验和产品提取网页的属性(例如,从一个web页的大集合中提取位置信息用在位置查询)大规模的图计算图显示了我们主要的源代码管理系统中,随着时间推移,MapReduce程序的显著增加,从年早先时候的个增长到年月份的差不多个不同的程序MapReduce之所以这样的成功,是因为他能够在不到半小时时间内写出一个简单的能够应用于上千台机器的大规模并发程序,并且极大的提高了开发和原形设计的周期效率并且,他可以让一个完全没有分布式和或并行系统经验的程序员,能够很容易的利用大量的资源在每一个任务结束的时候,MapReduce函数库记录使用的计算资源的统计信息在图里,我们列出了年月份在Google运行的一些MapReduce的工作的统计信息大规模索引到目前为止,最成功的MapReduce的应用就是重写了Googleweb搜索服务所使用到的index系统索引系统处理爬虫系统抓回来的超大量的文档集,这些文档集保存在GFS文件里这些文档的原始内容的大小,超过了TB索引程序是通过一系列的,大概到次MapReduce操作来建立索引通过利用MapReduce(替换掉上一个版本的特别设计的分布处理的索引程序版本)有这样一些好处:索引的代码简单,量少,容易理解,因为容错,分布式,并行处理都隐藏在MapReduce库中了例如,当使用MapReduce函数库的时候,计算的代码行数从原来的行C代码一下减少到大概行代码MapReduce的函数库的性能已经非常好,所以我们可以把概念上不相关的计算步骤分开处理,而不是混在一起以期减少在数据上的处理这使得改变索引过程很容易例如,我们对老索引系统的一个小更改可能要好几个月的时间,但是在新系统内,只需要花几天时间就可以了索引系统的操作更容易了,这是因为机器的失效,速度慢的机器,以及网络失效都已经由MapReduce自己解决了,而不需要操作人员的交互另外,我们可以简单的通过对索引系统增加机器的方式提高处理性能相关工作很多系统都提供了严格的设计模式,并且通过对编程的严格限制来实现自动的并行计算例如,一个结合函数可以通过N个元素的数组的前缀在N个处理器上使用并行前缀计算在logN的时间内计算完MapReduce是基于我们的大型现实计算的经验,对这些模型的一个简化和精炼并且,我们还提供了基于上千台处理器的容错实现而大部分并发处理系统都只在小规模的尺度上实现,并且机器的容错还是程序员来控制的BulkSynchronousProgramming以及一些MPIprimitives提供了更高级别的抽象,可以更容易写出并行处理的程序这些系统和MapReduce系统的不同之处在,MapReduce利用严格的编程模式自动实现用户程序的并发处理,并且提供了透明的容错处理我们本地的优化策略是受activedisks等技术的启发,在activedisks中,计算任务是尽量推送到靠近本地磁盘的处理单元上,这样就减少了通过IO子系统或网络的数据量我们在少量磁盘直接连接到普通处理机运行,来代替直接连接到磁盘控制器的处理机上,但是一般的步骤是相似的我们的备用任务的机制和在Charlotte系统上的积极调度机制相似这个简单的积极调度的一个缺陷是,如果一个任务引起了一个重复性的失败,那个整个计算将无法完成我们通过在故障情况下跳过故障记录的机制,在某种程度上解决了这个问题MapReduce实现依赖一个内置的机群管理系统来在一个大规模共享机器组上分布和运行用户任务虽然这个不是本论文的重点,但是集群管理系统在理念上和Condor等其他系统是一样的在MapReduce库中的排序工具在操作上和NOWSort相似源机器(mapworker)分割将要被排序的数据,然后把它发送到R个reduceworker中的一个上每个reduceworker来本地排序它的数据(如果可能,就在内存中)当然,NOWSort没有用户自定义的map和reduce函数,使得我们的库可以广泛的应用River提供一个编程模型,在这个模型下,处理进程可以靠在分布式的队列上发送数据进行彼此通讯和MapReduce一样,River系统尝试提供对不同应用有近似平均的性能,即使在不对等的硬件环境下或者在系统颠簸的情况下也能提供近似平均的性River是通过精心调度硬盘和网络的通讯,来平衡任务的完成时间MapReduce不和它不同利用严格编程模型,MapReduce构架来把问题分割成大量的任务这些任务被自动的在可用的worker上调度,以便速度快的worker可以处理更多的任务这个严格编程模型也让我们可以在工作快要结束的时候安排冗余的执行,来在非一致处理的情况减少完成时间(比如,在有慢机或者阻塞的worker的时候)BADFS是一个很MapReduce完全不同的编程模型,它的目标是在一个广阔的网络上执行工作然而,它们有两个基本原理是相同的()这两个系统使用冗余的执行来从由失效引起的数据丢失中恢复()这两个系统使用本地化调度策略,来减少通过拥挤的网络连接发送的数据数量TACC是一个被设计用来简化高有效性网络服务结构的系统和MapReduce一样,它通过再次执行来实现容错结束语MapReduce编程模型已经在Google成功的用在不同的目的我们把这个成功归于以下几个原因:第一,这个模型使用简单,甚至对没有并行和分布式经验的程序员也是如此,因为它隐藏了并行化,容错,位置优化和负载均衡的细节第二,大量不同的问题可以用MapReduce计算来表达例如,MapReduce被用来,为Google的产品web搜索服务,排序,数据挖掘,机器学习,和其他许多系统,产生数据第三,我们已经在一个好几千台计算机的大型集群上开发实现了这个MapReduce这个实现使得对于这些机器资源的利用非常简单,因此也适用于解决Google遇到的其他很多需要大量计算的问题从这个工作中我们也学习到了一些东西首先,严格的编程模型使得并行化和分布式计算简单,并且也易于构造这样的容错计算环境第二,网络带宽是系统的瓶颈因此在我们的系统中大量的优化目标是减少通过网络发送的数据量,本地优化使用我们从本地磁盘读取数据,并且把中间数据写到本地磁盘,以保留网络带宽第三,冗余的执行可以用来减少速度慢的机器的影响,和控制机器失效和数据丢失感谢JoshLevenberg校定和扩展了用户级别的MapReduceAPI,并且结合他的适用经验和其他人的改进建议,增加了很多新的功能MapReduce从GFS中读取和写入数据我们要感谢MohitAron,HowardGobioff,MarkusGutschke,DavidKrame,ShunTakLeung,和JoshRedstone,他们在开发GFS中的工作我们还感谢PercyLiangOlcanSercinoglu在开发用于MapReduce的集群管理系统得工作MikeBurrows,WilsonHsieh,JoshLevenberg,SharonPerl,RobPike,DebbyWallach为本论文提出了宝贵的意见OSDI的无名审阅者,以及我们的审核者EricBrewer,在论文应当如何改进方面给出了有益的意见最后,我们感谢Google的工程部的所有MapReduce的用户,感谢他们提供了有用的反馈,建议,以及错误报告等等A单词频率统计本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率#include"mapreducemapreduceh"用户map函数classWordCounter:publicMapper{public:virtualvoidMap(constMapInputinput){conststringtext=inputvalue()constintn=textsize()for(inti=i<n){跳过前导空格while((i<n)isspace(texti))i查找单词的结束位置intstart=iwhile((i<n)!isspace(texti))iif(start<i)Emit(textsubstr(start,istart),"")}}}REGISTERMAPPER(WordCounter)用户的reduce函数classAdder:publicReducer{virtualvoidReduce(ReduceInput*input){迭代具有相同key的所有条目,并且累加它们的valueintvalue=while(!input>done()){value=StringToInt(input>value())input>NextValue()}提交这个输入key的综合Emit(IntToString(value))}}REGISTERREDUCER(Adder)intmain(intargc,char**argv){ParseCommandLineFlags(argc,argv)MapReduceSpecificationspec把输入文件列表存入"spec"for(inti=i<argci){MapReduceInput*input=specaddinput()input>setformat("text")input>setfilepattern(argvi)input>setmapperclass("WordCounter")}指定输出文件:gfstestfreqofgfstestfreqofMapReduceOutput*out=specoutput()out>setfilebase("gfstestfreq")out>setnumtasks()out>setformat("text")out>setreducerclass("Adder")可选操作:在map任务中做部分累加工作,以便节省带宽out>setcombinerclass("Adder")调整参数:使用台机器,每个任务MB内存specsetmachines()specsetmapmegabytes()specsetreducemegabytes()运行它MapReduceResultres

类似资料

编辑推荐

中华学生百科全书:激光技术.pdf

中华学生百科全书:结识动物.pdf

中华学生百科全书:结识植物.pdf

西方政治思想史第5卷二战以来.pdf

中华学生百科全书:宇宙星空.pdf

职业精品

精彩专题

上传我的资料

精选资料

热门资料排行换一换

  • 力学名着译丛 理论流体动力学.p…

  • 魔鬼咨询师搭讪文章汇总.pdf

  • Lowrider Girls 顶…

  • 伟大的抗美援朝运动(人民出版社 …

  • 四库全书.经部.易类.132.周…

  • 金属学与热处理.pdf

  • 禽病诊治彩色图谱.pdf

  • 庭园生物药用精华 庭园药用动物2…

  • Critical Thinkin…

  • 资料评价:

    / 14
    所需积分:1 立即下载

    意见
    反馈

    返回
    顶部