首页 分布式计算开源框架Hadoop入门实践

分布式计算开源框架Hadoop入门实践

举报
开通vip

分布式计算开源框架Hadoop入门实践 分布式计算开源框架 Hadoop入门实践 作者介绍:岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及 阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。没有 什么擅长或者精通,工作到现在唯一提升的就是学习能力和速度。 一、分布式计算开源框架 Hadoop实践 在 SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模 式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设...

分布式计算开源框架Hadoop入门实践
分布式计算开源框架 Hadoop入门实践 作者介绍:岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及 阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。没有 什么擅长或者精通,工作到现在唯一提升的就是学习能力和速度。 一、分布式计算开源框架 Hadoop实践 在 SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模 式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设计与 实现》中有所提到。但是由于统计的内容暂时还是十分简单,所以就采用Memcache作为 计数器,结合 MySQL就完成了访问控制以及统计的工作。然而未来,对于海量日志分析的 工作,还是需要有所准备。现在最火的技术词汇莫过于“云计算”,在 Open API日益盛行的 今天,互联网应用的数据将会越来越有价值,如何去分析这些数据,挖掘其内在价值,就需 要分布式计算来支撑海量数据的分析工作。 回过头来看,早先那种多线程,多任务分解的日志分析设计,其实是分布式计算的一个单机 版缩略,如何将这种单机的工作进行分拆,变成协同工作的集群,其实就是分布式计算框架 设计所涉及的。在去年参加 BEA大会的时候,BEA和 VMWare合作采用虚拟机来构建集群, 无非就是希望使得计算机硬件能够类似于应用程序中资源池的资源,使用者无需关心资源的 分配情况,从而最大化了硬件资源的使用价值。分布式计算也是如此,具体的计算任务交由 哪一台机器执行,执行后由谁来汇总,这都由分布式框架的 Master来抉择,而使用者只需 简单地将待分析内容提供给分布式计算系统作为输入,就可以得到分布式计算后的结果。 Hadoop是 Apache开源组织的一个分布式计算开源框架,在很多大型网站上都已经得到了 应用,如亚马逊、Facebook和 Yahoo等等。 对于我来说,最近的一个使用点就是服务集 成平台的日志分析。服务集成平台的日志量将会很大,而这也正好符合了分布式计算的适用 场景(日志分析和索引建立就 是两大应用场景)。 当前没有正式确定使用,所以也是自己业余摸索,后续所写的相关内容,都是一个新手的学 习过程,难免会有一些错误,只是希望 记录 混凝土 养护记录下载土方回填监理旁站记录免费下载集备记录下载集备记录下载集备记录下载 下来可以分享给更多志同道合的朋友。 什么是 Hadoop? 搞什么东西之前,第一步是要知道What(是什么),然后是Why(为什么),最后才是 H ow(怎么做)。但很多开发的朋友在做了多年项目以后,都习惯是先 How,然后What, 最后才是Why,这样只会让自己变得浮躁,同时往往会将技术误用于不适合的场景。 Hadoop框架中最核心的设计就是:MapReduce和 HDFS。MapReduce的 思想 教师资格思想品德鉴定表下载浅论红楼梦的主题思想员工思想动态调查问卷论语教育思想学生思想教育讲话稿 是由 Google 的一篇论文所提及而被广为流传的, 简单的一句话解释 MapReduce就是“任务的分解与结 果的汇总”。HDFS是 Hadoop分布式文件系统(Hadoop Distributed File System)的缩写, 为分布式计算存储提供了底层支持。 MapReduce从它名字上来看就大致可以看出个缘由,两个动词 Map和 Reduce,“Map(展 开)”就是将一个任务分解成为多个任 务,“Reduce”就是将分解后多任务处理的结果汇总起 来,得出最后的分析结果。这不是什么新思想,其实在前面提到的多线程,多任务的设计就 可以找到这 种思想的影子。不论是现实社会,还是在程序设计中,一项工作往往可以被拆 分成为多个任务,任务之间的关系可以分为两种:一种是不相关的任务,可以并行执 行; 另一种是任务之间有相互的依赖,先后顺序不能够颠倒,这类任务是无法并行处理的。回到 大学时期,教授上课时让大家去分析关键路径,无非就是找最省时的 任务分解执行方式。 在分布式系统中,机器集群就可以看作硬件资源池,将并行的任务拆分,然后交由每一个空 闲机器资源去处理,能够极大地提高计算效率,同时 这种资源无关性,对于计算集群的扩 展无疑提供了最好的设计保证。(其实我一直认为 Hadoop的卡通图标不应该是一个小象, 应该是蚂蚁,分布式计算就好比 蚂蚁吃大象,廉价的机器群可以匹敌任何高性能的计算机, 纵向扩展的曲线始终敌不过横向扩展的斜线)。任务分解处理以后,那就需要将处理以后的 结果再汇总起 来,这就是 Reduce要做的工作。 图 1:MapReduce结构示意图 上图就是 MapReduce大致的结构图,在Map前还可能会对输入的数据有 Split(分割)的 过程,保证任务并行效率,在Map之后还会有 Shuffle(混合)的过程,对于提高 Reduce 的效率以及减小数据传输的压力有很大的帮助。后面会具体提及这些部分的细节。 HDFS是分布式计算的存储基石,Hadoop的分布式文件系统和其他分布式文件系统有很多 类似的特质。分布式文件系统基本的几个特点: 1. 对于整个集群有单一的命名空间。 2. 数据一致性。适合一次写入多次读取的模型,客户端在文件没有被成功创建之前无 法看到文件存在。 3. 文件会被分割成多个文件块,每个文件块被分配存储到数据节点上,而且根据配置 会由复制文件块来保证数据的安全性。 图 2:HDFS结构示意图 上图中展现了整个 HDFS三个重要角色:NameNode、DataNode和 Client。NameNode可 以看作是分布式文件系统中的管理 者,主要负责管理文件系统的命名空间、集群配置信息 和存储块的复制等。NameNode会将文件系统的 Meta-data存储在内存中,这些信息主要 包括 了文件信息、每一个文件对应的文件块的信息和每一个文件块在 DataNode的信息等。 DataNode是文件存储的基本单元,它将 Block存储在本地 文件系统中,保存了 Block的 M eta-data,同时周期性地将所有存在的 Block信息发送给 NameNode。Client就是需要获取 分布式文 件系统文件的应用程序。这里通过三个操作来说明他们之间的交互关系。 文件写入: 1. Client向 NameNode发起文件写入的请求。 2. NameNode根据文件大小和文件块配置情况,返回给 Client它所管理部分 DataNod e的信息。 3. Client将文件划分为多个 Block,根据 DataNode的地址信息,按顺序写入到每一个 DataNode块中。 文件读取: 1. Client向 NameNode发起文件读取的请求。 2. NameNode返回文件存储的 DataNode的信息。 3. Client读取文件信息。 文件 Block复制: 1. NameNode发现部分文件的 Block不符合最小复制数或者部分 DataNode失效。 2. 通知 DataNode相互复制 Block。 3. DataNode开始直接相互复制。 最后再说一下 HDFS的几个设计特点(对于框架设计值得借鉴): 1. Block的放置:默认不配置。一个 Block会有三份备份,一份放在 NameNode指定 的 DataNode,另一份放在 与指定 DataNode非同一 Rack上的 DataNode,最后一 份放在与指定 DataNode同一 Rack上的 DataNode上。备份无非就是为了 数据安 全,考虑同一 Rack的失败情况以及不同 Rack之间数据拷贝性能问题就采用这种配 置方式。 2. 心跳检测 DataNode的健康状况,如果发现问题就采取数据备份的方式来保证数据 的安全性。 3. 数 据复制(场景为 DataNode失败、需要平衡 DataNode的存储利用率和需要平衡 DataNode数据交互压力等情况):这里先说一下,使用 HDFS的 balancer命令,可 以配置一个 Threshold来平衡每一个 DataNode磁盘利用率。例如设置了 Threshol d为 10%,那么 执行 balancer命令的时候,首先统计所有 DataNode的磁盘利用 率的均值,然后判断如果某一个 DataNode的磁盘利用率超过这个均值 Threshold 以上,那么将会把这个 DataNode的 block转移到磁盘利用率低的 DataNode,这对 于新节点的加入来说十分有用。 4. 数据交验:采用 CRC32作数据交验。在文件 Block写入的时候除了写入数据还会写 入交验信息,在读取的时候需要交验后再读入。 5. NameNode是单点:如果失败的话,任务处理信息将会纪录在本地文件系统和远端 的文件系统中。 6. 数 据管道性的写入:当客户端要写入文件到 DataNode上,首先客户端读取一个 B lock然后写到第一个 DataNode上,然后由第一个 DataNode传递到备份的 DataN ode上,一直到所有需要写入这个 Block的 NataNode都成功写入,客户端才会继续 开始写下一个 Block。 7. 安全模式:在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文 件系统处于安全模式的情况下,文 件系统中的内容不允许修改也不允许删除,直到 安全模式结束。安全模式主要是为了系统启动的时候检查各个 DataNode上数据块 的有效性,同时根据策略必 要的复制或者删除部分数据块。运行期通过命令也可以 进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式 不允许修改的出错提示,只需要等待一会儿即可。 下面综合 MapReduce和 HDFS来看 Hadoop的结构: 图 3:Hadoop结构示意图 在 Hadoop的系统中,会有一台Master,主要负责 NameNode的工作以及 JobTracker的工 作。JobTracker的主要职责 就是启动、跟踪和调度各个 Slave的任务执行。还会有多台 Sl ave,每一台 Slave通常具有 DataNode的功能并负责 TaskTracker的 工作。TaskTracker 根据应用要求来结合本地数据执行 Map任务以及 Reduce任务。 说到这里,就要提到分布式计算最重要的一个设计点:Moving Computation is Cheaper t han Moving Data。就是在分布式处理中,移动数据的代价总是高于转移计算的代价。简单 来说就是分而治之的工作,需要将数据也分而存储,本地任务处理本地数据然后归 总,这 样才会保证分布式计算的高效性。 为什么要选择 Hadoop? 说完了What,简单地说一下Why。官方网站已经给了很多的说明,这里就大致说一下其优 点及使用的场景(没有不好的工具,只用不适用的工具,因此选择好场景才能够真正发挥分 布式计算的作用): 1. 可扩展:不论是存储的可扩展还是计算的可扩展都是 Hadoop的设计根本。 2. 经济:框架可以运行在任何普通的 PC上。 3. 可靠:分布式文件系统的备份恢复机制以及 MapReduce的任务监控保证了分布式处 理的可靠性。 4. 高效:分布式文件系统的高效数据交互实现以及MapReduce结合 Local Data处理 的模式,为高效处理海量的信息作了基础准备。 使用场景:个人觉得最适合的就是海量数据的分析,其实 Google最早提出 MapReduce也 就是为了海量数据分析。同时 HDFS最早是为了搜索引擎实现而开发的,后来才被用于分布 式计算框架中。海量数据被分割于多个节点,然后由每一个节点并行计算,将得出的结 果 归并到输出。同时第一阶段的输出又可以作为下一阶段计算的输入,因此可以想象到一个树 状结构的分布式计算图,在不同阶段都有不同产出,同时并行和串行结 合的计算也可以很 好地在分布式集群的资源下得以高效的处理。 二、Hadoop中的集群配置和使用技巧 其实参看 Hadoop官方文档已经能够很容易配置分布式框架运行环境了,不过这里既然写了 就再多写一点,同时有一些细节需要注意的也说明一下,其实 也就是这些细节会让人摸索 半天。Hadoop可以单机跑,也可以配置集群跑,单机跑就不需要多说了,只需要按照 De mo的运行说明直接执行命令即可。这里 主要重点说一下集群配置运行的过程。 环境 7台普通的机器,操作系统都是 Linux。内存和 CPU就不说了,反正 Hadoop一大特点就是 机器在多不在精。JDK必须是 1.5以上的,这个切记。7台机器的机器名务必不同,后续会 谈到机器名对于 MapReduce有很大的影响。 部署考虑 正如上面我描述的,对于 Hadoop的集群来说,可以分成两大类角色:Master和 Slave,前 者主要配置 NameNode和 JobTracker的角色,负责总管分布式数据和分解任务的执行,后 者配置 DataNode和 TaskTracker的角色,负责分布式数据存储以及任 务的执行。本来我 打算看看一台机器是否可以配置成 Master,同时也作为 Slave使用,不过发现在 NameNod e初始化的过程中以及 TaskTracker执行过程中机器名配置好像有冲突(NameNode和 Tas kTracker对于 Hosts的配置有些冲突,究竟是把机器名对应 IP放在配置前面还是把 Local host对应 IP放在前面有点问题,不过可能也是我自己的问题吧,这个大家可以根据实施情 况给我反馈)。最后反正决定一 台 Master,六台 Slave,后续复杂的应用开发和测试结果 的比对会增加机器配置。 实施步骤 1. 在所有的机器上都建立相同的目录,也可以就建立相同的用户,以该用户的 home 路径来做 hadoop的安装路径。例如我在所有的机器上都建立了/home/wenchu。 2. 下载 Hadoop,先解压到 Master上。这里我是下载的 0.17.1的版本。此时 Hadoop 的安装路径就是/home/wenchu/hadoop-0.17.1。 3. 解压后进入 conf目录,主要需要修改以下文件:hadoop-env.sh,hadoop-site. xml、masters、slaves。 Hadoop的基础配置文件是 hadoop-default.xml,看 Hadoop的代码可以知道, 默认建立一个 Job的时候会建立 Job的 Config,Config首先读入 hadoop-default. xml的配置,然后再读入 hadoop-site.xml的配置(这个文件初始的时候配置为 空),hadoop-site.xml中主要配置你需要覆盖的 hadoop-default.xml的系 统级配置,以及你需要在你的 MapReduce过程中使用的自定义配置(具体的一些使 用例如 final等参考文档)。 以下是一个简单的 hadoop-site.xml的配置: fs.default.name//你的 namenode的配置,机器名加端 口 hdfs://10.2.224.46:54310/ mapred.job.tracker//你的 JobTracker的配置,机器 名加端口 hdfs://10.2.224.46:54311/ dfs.replication//数据需要备份的数量,默认是三 1 hadoop.tmp.dir//Hadoop的默认临时路径,这个最好 配置,如果在新 增节点或者其他情况下莫名其妙的 DataNode启动不了,就删除此文件中 的 tmp目录即可。不过如果删除了 NameNode机器的此目录,那么就需要 重新执行 NameNode格式化的命令。 /home/wenchu/hadoop/tmp/ mapred.child.java.opts//java虚拟机的一些参数可 以参照配置 -Xmx512m dfs.block.size//block的大小,单位字节,后面会提 到用处,必须是 512的倍数,因为采用 crc作文件完整性校验,默认配置 512是 checksu m的最小单元。 5120000 The default block size for new files. hadoop-env.sh文件只需要修改一个参数: # The java implementation to use. Required. export JAVA_HOME=/usr/ali/jdk1.5.0_10 配置你的 Java路径,记住一定要 1.5版本以上,免得莫名其妙出现问题。 Masters中配置 Masters的 IP或者机器名,如果是机器名那么需要在/etc/hosts 中有所设置。Slaves中配置的是 Slaves的 IP或者机器名,同样如果是机器名需要 在/etc/hosts中有所设置。范例如下,我这里配置的都是 IP: Masters: 10.2.224.46 Slaves: 10.2.226.40 10.2.226.39 10.2.226.38 10.2.226.37 10.2.226.41 10.2.224.36 4. 建立 Master到每一台 Slave的 SSH受信证书。由于Master将会通过 SSH启动所有 Slave的 Hadoop,所以需要建立单向或者双向证书保证命令执行时不需要再输入密 码。在 Master和所有的 Slave机器上执行:ssh-keygen -t rsa。执行此命令的 时候,看到提示只需要回车。然后就会在/root/.ssh/下面产生 id_rsa.pub的证 书文件,通过 scp将 Master机器上的这个文件拷贝到 Slave上(记得修改名称), 例如:scp root@masterIP:/root/.ssh/id_rsa.pub /root/.ssh/46_rsa. pub,然后执行 cat /root/.ssh/46_rsa.pub >>/root/.ssh/authorized_k eys,建立 authorized_keys文 件即可,可以打开这个文件看看,也就是 rsa的 公钥作为 key,user@IP作为 value。此时可以试验一下,从master ssh到 slave已 经不需要密码了。由 slave反向建立也是同样。为什么要反向呢?其实如果一直都 是 Master启动和关闭的话那么没有必要建立反 向,只是如果想在 Slave也可以关 闭 Hadoop就需要建立反向。 5. 将 Master上的 Hadoop通过 scp拷贝到每一个 Slave相同的目录下,根据每一个 Sl ave的 Java_HOME的不同修改其 hadoop-env.sh。 6. 修改 Master上/etc/profile: 新增以下内容:(具体的内容根据你的安装路径修改,这步只是为了方便使用) export HADOOP_HOME=/home/wenchu/hadoop-0.17.1 export PATH=$PATH:$HADOOP_HOME/bin 修改完毕后,执行 source /etc/profile来使其生效。 7. 在 Master上执行 Hadoop namenode –format,这是第一需要做的初始化,可以 看作格式化吧,以后除了在上面我提到过删除了Master上的 hadoop.tmp.dir目 录,否则是不需要再次执行的。 8. 然后执行 Master上的 start-all.sh,这个命令可以直接执行,因为在 6中已经添 加到了 path路径,这个命令是启动 hdfs和mapreduce两部分,当然你也可以分开 单独启动 hdfs和mapreduce,分别是 bin目录下的 start-dfs.sh和 start-map red.sh。 9. 检查 Master的 logs目录,看看 Namenode日志以及 JobTracker日志是否正常启动。 10. 检查 Slave的 logs目录看看 Datanode日志以及 TaskTracker日志是否正常。 11. 如果需要关闭,那么就直接执行 stop-all.sh即可。 以上步骤就可以启动Hadoop的分布式环境,然后在Master的机器进入Master的安装目录, 执行 hadoop jar hadoop-0.17.1-examples.jar wordcount输入路径和输出路径, 就可以看到字数统计的效果了。此处的输入路径和输出路径都指的是 HDFS中的路径,因此 你可以首先通过拷贝本地文件系统中的目录到 HDFS中的方式来建立 HDFS中的输入路径: hadoop dfs -copyFromLocal /home/wenchu/test-in test-in。其中/home/wenc hu/test-in是本地路径,test-in是将会建立在 HDFS中的路径,执行完毕以后可以通 过 hadoop dfs –ls看到 test-in目录已经存在,同时可以通过 hadoop dfs –ls tes t-in查看里面的内容。输出路径要求是在 HDFS中不存在的,当执行完那个 demo以后, 就可以通过 hadoop dfs –ls 输出路径看到其中的内容,具体文件的内容可以通过 had oop dfs –cat文件名称来查看。 经验 总结 初级经济法重点总结下载党员个人总结TXt高中句型全总结.doc高中句型全总结.doc理论力学知识点总结pdf 和注意事项(这部分是我在使用过程中花了一些时间走的弯路): 1. Master和 Slave上的几个 conf配置文件不需要全部同步,如果确定都是通过 Maste r去启动和关闭,那么 Slave机器上的配置不需要去维护。但如果希望在任意一台机 器都可以启动和关闭 Hadoop,那么就需要全部保持一致了。 2. Master和 Slave机器上的/etc/hosts中 必须把集群中机器都配置上去,就算在各 个配置文件中使用的是 IP。这个吃过不少苦头,原来以为如果配成 IP就不需要去配 置 Host,结果发现在执行 Reduce的时候总是卡住,在拷贝的时候就无法继续下去, 不断重试。另外如果集群中如果有两台机器的机器名如果重复也会出现问题。 3. 如果在新增了节点或者删除节点的时候出现了问题,首先就去删除 Slave的 hadoo p.tmp.dir,然后重新启动试试看,如果还是不行那就干脆把 Master的 hadoop. tmp.dir删除(意味着 dfs上的数据也会丢失),如果删除了 Master的 hadoop.t mp.dir,那么就需要重新 namenode –format。 4. Map任务个数以及 Reduce任务个数配置。前面分布式文件系统设计提到一个文件 被放入到分布式文件系统中,会被分割成多个 block放置到每一个的 DataNode上, 默认 dfs.block.size应该是 64M,也就是说如果你放置到 HDFS上的数据小于 6 4,那么将只有一个 Block,此时会被放置到某一个 DataNode中,这个可以通过使 用命令:hadoop dfsadmin –report就可以看到各个节点存储的情况。也可以 直接去某一个 DataNode查看目录:hadoop.tmp.dir/dfs/data/current就 可 以看到那些 block了。Block的数量将会直接影响到 Map的个数。当然可以通过配 置来设定 Map和 Reduce的任务个数。Map的个数通常默认 和 HDFS需要处理的 b locks相同。也可以通过配置Map的数量或者配置minimum split size来设定,实 际的个数为:max(min(block_size,data/#maps),min_split_size)。Reduc e可以通过这个 公式 小学单位换算公式大全免费下载公式下载行测公式大全下载excel公式下载逻辑回归公式下载 计算:0.95*num_nodes*mapred.tasktracker.tasks.max imum。 总的来说出了问题或者启动的时候最好去看看日志,这样心里有底。 Hadoop中的命令(Command)总结 这部分内容其实可以通过命令的 Help以及介绍了解,我主要侧重于介绍一下我用的比较多 的几个命令。Hadoop dfs 这个命令后面加参数就是对于 HDFS的操作,和 Linux操作系统 的命令很类似,例如: · Hadoop dfs –ls就是查看/usr/root目录下的内容,默认如果不填路径这就是当 前用户路径; · Hadoop dfs –rmr xxx就是删除目录,还有很多命令看看就很容易上手; · Hadoop dfsadmin –report这个命令可以全局的查看 DataNode的情况; · Hadoop job后面增加参数是对于当前运行的 Job的操作,例如 list,kill等; · Hadoop balancer就是前面提到的均衡磁盘负载的命令。 其他就不详细介绍了。 三、Hadoop基本 流程 快递问题件怎么处理流程河南自建厂房流程下载关于规范招聘需求审批流程制作流程表下载邮件下载流程设计 与应用开发 Hadoop基本流程 一个图片太大了,只好分割成为两部分。根据流程图来说一下具体一个任务执行的情况。 1. 在分布式环境中客户端创建任务并提交。 2. InputFormat做 Map前的预处理,主要负责以下工作: 1. 验证输入的格式是否符合 JobConfig的输入定义,这个在实现 Map和构建 C onf的时候就会知道,不定义可以是Writable的任意子类。 2. 将 input的文件切分为逻辑上的输入 InputSplit,其实这就是在上面提到的 在分布式文件系统中 blocksize是有大小限制的,因此大文件会被划分为多 个 block。 3. 通过 RecordReader来再次处理 inputsplit为一组 records,输出给 Map。(i nputsplit只是逻辑切分的第一步,但是如何根据文件中的信息来切分还需要 RecordReader来实现,例如最简单的默认方式就是回车换行的切分) 3. RecordReader处理后的结果作为 Map的输入,Map执行定义的Map逻辑,输出处 理后的 key和 value对应到临时中间文件。 4. Combiner可选择配置,主要作用是在每一个 Map执行完分析以后,在本地优先作 R educe的工作,减少在 Reduce过程中的数据传输量。 5. Partitioner可选择配置,主要作用是在多个 Reduce的情况下,指定 Map的结果由 某一个 Reduce处理,每一个 Reduce都会有单独的输出文件。(后面的代码实例中 有介绍使用场景) 6. Reduce执行具体的业务逻辑,并且将处理结果输出给 OutputFormat。 7. OutputFormat的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否 如 Config中配置,最后输出 Reduce汇总后的结果。 业务场景和代码范例 业务场景描述:可设定输入和输出路径(操作系统的路径非 HDFS路径),根据访问日志分 析某一个应用访问某一个 API的总次数和总流量,统计后分别输出到两个文件中。这里仅仅 为了测试,没有去细分很多类,将所有的类都归并于一个类便于说明问题。 测试代码类图 LogAnalysiser就是主类,主要负责创建、提交任务,并且输出部分信息。内部的几个子类 用途可以参看流程中提到的角色职责。具体地看看几个类和方法的代码片断: LogAnalysiser::MapClass public static class MapClass extends MapReduceBase implements Mapper { public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString();//没有配置 RecordReader,所 以默认采用 line 的实现,key就是行号,value就是行内容 if (line == null || line.equals("")) return; String[] words = line.split(","); if (words == null || words.length < 8) return; String appid = words[1]; String apiName = words[2]; LongWritable recbytes = new LongWritable(Long.parseLong(w ords[7])); Text record = new Text(); record.set(new StringBuffer("flow::").append(appid) .append("::").append(apiName).toString()); reporter.progress(); //输出流量的统计结果,通过 flow::作为前缀来标示。output.col lect(record, recbytes); record.clear(); record.set(new StringBuffer("count::").append(appid).appe nd("::") .append(apiName).toString()); //输出次数的统计结果,通过 count::作为前缀来标示 output.collect(record, new LongWritable(1)); } } LogAnalysiser:: PartitionerClass public static class PartitionerClass implements Partitioner { public int getPartition(Text key, LongWritable value, int num Partitions) { if (numPartitions >= 2)//Reduce 个数,判断流量还是次数的统 计分配到不同的 Reduce if (key.toString().startsWith("flow::")) return 0; else return 1; else return 0; } public void configure(JobConf job){} } LogAnalysiser:: CombinerClass 参看 ReduceClass,通常两者可以使用一个,不过这里有些不同的处理就分成了两个。在 R educeClass中蓝色的行表示在 CombinerClass中不存在。 LogAnalysiser:: ReduceClass public static class ReduceClass extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter)throws IOException { Text newkey = new Text(); newkey.set(key.toString().substring(key.toString().indexO f("::")+2)); LongWritable result = new LongWritable(); long tmp = 0; int counter = 0; while(values.hasNext())//累加同一个 key的统计结果 { tmp = tmp + values.next().get(); counter = counter +1;//担心处理太久,JobTracker长时间 没有收到报告会认为 TaskTracker已经失效,因此定时报告一下 if (counter == 1000) { counter = 0; reporter.progress(); } } result.set(tmp); output.collect(newkey, result);//输出最后的汇总结果 } } LogAnalysiser public static void main(String[] args) { try { run(args); } catch (Exception e) { e.printStackTrace(); } } public static void run(String[] args) throws Exception { if (args == null || args.length <2) { System.out.println("need inputpath and outputp ath"); return; } String inputpath = args[0]; String outputpath = args[1]; String shortin = args[0]; String shortout = args[1]; if (shortin.indexOf(File.separator) >= 0) shortin = shortin.substring(shortin.lastIndexO f(File.separator)); if (shortout.indexOf(File.separator) >= 0) shortout = shortout.substring(shortout.lastInd exOf(File.separator)); SimpleDateFormat formater = new SimpleDateFormat("yyy y.MM.dd"); shortout = new StringBuffer(shortout).append("-") .append(formater.format(new Date())).toString (); if (!shortin.startsWith("/")) shortin = "/" + shortin; if (!shortout.startsWith("/")) shortout = "/" + shortout; shortin = "/user/root" + shortin; shortout = "/user/root" + shortout; File inputdir = new File(inputpath); File outputdir = new File(outputpath); if (!inputdir.exists() || !inputdir.isDirectory()) { System.out.println("inputpath not exist or is n’t dir!"); return; } if (!outputdir.exists()) { new File(outputpath).mkdirs(); } JobConf conf = new JobConf(new Configuration(),LogAna lysiser.class);//构建 Config FileSystem fileSys = FileSystem.get(conf); fileSys.copyFromLocalFile(new Path(inputpath), new Pa th(shortin));//将本地文件系统的文件拷贝到 HDFS中 conf.setJobName("analysisjob"); conf.setOutputKeyClass(Text.class);//输出的 key类型, 在 OutputFormat会检查 conf.setOutputValueClass(LongWritable.class); //输出 的 value类型,在 OutputFormat会检查 conf.setMapperClass(MapClass.class); conf.setCombinerClass(CombinerClass.class); conf.setReducerClass(ReduceClass.class); conf.setPartitionerClass(PartitionerClass.class); conf.set("mapred.reduce.tasks", "2");//强制需要有两个 Reduce来分别处理流量和次数的统计 FileInputFormat.setInputPaths(conf, shortin);//hdfs中 的输入路径 FileOutputFormat.setOutputPath(conf, new Path(shortou t));//hdfs中输出路径 Date startTime = new Date(); System.out.println("Job started: " + startTime); JobClient.runJob(conf); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTim e() - startTime.getTime()) /1000 + " seconds."); //删除输入和输出的临时文件 fileSys.copyToLocalFile(new Path(shortout),new Path(o utputpath)); fileSys.delete(new Path(shortin),true); fileSys.delete(new Path(shortout),true); } 以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务 Class为一 个可标示的命令,让 hadoop jar可以执行。 public class ExampleDriver { public static void main(String argv[]){ ProgramDriver pgd = new ProgramDriver(); try { pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log ."); pgd.driver(argv); } catch(Throwable e){ e.printStackTrace(); } } } 将代码打成 jar,并且设置 jar的mainClass为 ExampleDriver这个类。在分布式环境启动以 后执行如下语句: hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenc hu/test-out 在/home/wenchu/test-in中是需要分析的日志文件,执行后就会看见整个执行过程,包括 了 Map和 Reduce的进度。执行完毕会 在/home/wenchu/test-out下看到输出的内容。有 两个文件:part-00000和 part-00001分别记录了统计后的结果。 如果需要看执行的具体情 况,可以看在输出目录下的_logs/history/xxxx_analysisjob,里面罗列了所有的 Map,Redu ce 的创建情况以及执行情况。在运行期也可以通过浏览器来查看 Map,Reduce的情况:htt p://MasterIP:50030 /jobtracker.jsp Hadoop集群测试 首先这里使用上面的范例作为测试,也没有做太多的优化配置,这个测试结果只是为了看看 集群的效果,以及一些参数配置的影响。 文件复制数为 1,blocksize 5M Slave数 处理记录数(万条) 执行时间(秒) 2 95 38 2 950 337 4 95 24 4 950 178 6 95 21 6 950 114 Blocksize 5M Slave数 处理记录数(万条) 执行时间(秒) 2(文件复制数为 1) 950 337 2(文件复制数为 3) 950 339 6(文件复制数为 1) 950 114 6(文件复制数为 3) 950 117 文件复制数为 1 Slave数 处理记录数(万条) 执行时间(秒) 6(blocksize 5M) 95 21 6(blocksize 77M) 95 26 4(blocksize 5M) 950 178 4(blocksize 50M) 950 54 6(blocksize 5M) 950 114 6(blocksize 50M) 950 44 6(blocksize 77M) 950 74 测试的数据结果很稳定,基本测几次同样条件下都是一样。通过测试结果可以看出以下几点: 1. 机器数对于性能还是有帮助的(等于没说^_^)。 2. 文件复制数的增加只对安全性有帮助,但是对于性能没有太多帮助。而且现在采取 的是将操作系统文件拷贝到 HDFS中,所以备份多了,准备的时间很长。 3. blocksize对于性能影响很大,首先如果将 block划分的太小,那么将会增加 job的 数量,同时也增加了协作的代价,降低了性能,但是配置的太大也会让 job不能最 大化并行处理。所以这个值的配置需要根据数据处理的量来考虑。 4. 最后就是除了这个表里面列出来的结果,应该去仔细
本文档为【分布式计算开源框架Hadoop入门实践】,请使用软件OFFICE或WPS软件打开。作品中的文字与图均可以修改和编辑, 图片更改请在作品中右键图片并更换,文字修改请直接点击文字进行修改,也可以新增和删除文档中的内容。
该文档来自用户分享,如有侵权行为请发邮件ishare@vip.sina.com联系网站客服,我们会及时删除。
[版权声明] 本站所有资料为用户分享产生,若发现您的权利被侵害,请联系客服邮件isharekefu@iask.cn,我们尽快处理。
本作品所展示的图片、画像、字体、音乐的版权可能需版权方额外授权,请谨慎使用。
网站提供的党政主题相关内容(国旗、国徽、党徽..)目的在于配合国家政策宣传,仅限个人学习分享使用,禁止用于任何广告和商用目的。
下载需要: 免费 已有0 人下载
最新资料
资料动态
专题动态
is_653562
暂无简介~
格式:pdf
大小:294KB
软件:PDF阅读器
页数:0
分类:互联网
上传时间:2011-12-30
浏览量:16