澳门金沙国际,Hadoop-2.4.1学习之Mapper和Reducer

MapReduce允许技术员能够轻松地编写并行运转在大面积集群上管理多量数额的先后,确定保证程序的运营稳定可信和享有容错管理技术。程序猿编写的运转在MapReduce上的应用程序称为作业(job),Hadoop既帮衬用Java编写的job,也支撑任何语言编写的学业,举个例子Hadoop
Streaming(shell、python)和Hadoop
Pipes(c++)。Hadoop-2.X不再保留Hadoop-一.X版本中的JobTracker和TaskTracker组件,但那并不代表Hadoop-2.X不再协助MapReduce作业,相反Hadoop-二.X通过唯一的主ResourceManager、各类节点二个的从NodeManager和每一种应用程序3个的MRAppMaster保留了对MapReduce作业的向后万分。在新本子中MapReduce作业依然由Map和Reduce任务组成,Map还是接收由MapReduce框架将输入数据分割为数据块,然后Map任务以完全并行的办法管理那些数据块,接着MapReduce框架对Map任务的输出举行排序,并将结果做为Reduce职责的输入,最终由Reduce任务输出最后的结果,在总体实践进度中MapReduce框架担负职责的调整,监察和控制和再一次试行倒闭的任务等。

经常计算节点和存款和储蓄节点是千篇壹律的,MapReduce框架会使得地将职务布署在存款和储蓄数据的节点上,有助于降低传输数据时的带宽使用量。MapReduce应用程序通过完成恐怕一连合适的接口或类提供了map和reduce函数,那多少个函数负担Map职责和Reduce职责。作业客户端将编写制定好的作业提交给ResourceManager,而不再是JobTracker,ResourceManager担任将作业分布到从节点上,调节和监督检查作业,为作业客户端提供情形和检查判断消息。

MapReduce框架只管理<key,
value>键值对,约等于将作业的输入视为一些键值对并出口键值对。做为键值的类必须能够被MapReduce框架体系化,因而供给落成Writable接口,常用的IntWritable,LongWritable和Text都以得以完结该接口的类。做为键的类除却要促成Writable接口外,还亟需完毕WritableComparable接口,达成该接口首要为了拉动排序,下面提到的多个类也都完结了该接口。

在大约介绍了MapReduce框架后,下边长远学习框架中的多个重大约念:Mapper和Reducer,正如上文提到了,它们组成了MapReduce作业并担任落成实际的作业逻辑管理。

Mapper是独自的天职,将输入记录转变为中等记录,即对输入的键值对进展拍卖,并出口为1组中间键值对,输出的键值对利用context.write(WritableComparable,
Writable)方法搜集起来,中间记录的键值类型不必与输入记录的键值类型一样,实际上也再三是见仁见智的。一条输入记录经由Mapper管理后或许输出为0条要么多条个中记录。举个例子,如果输入记录不满意专门的学业必要(未有包罗特定的值或许隐含了特定的值)的话,能够直接回到,则会输出0条记下,此时Mapper起了过滤器的效益。

随之MapReduce框架将与给定键相关联的富有中等值分组,然后传递给Reducer。用户能够透过Job.setGroupingComparatorClass(Class)方法钦点Comparator来决定分组。Mapper的输出被排序然后遵照Reducer分区,总的分区数与作业运转的Reducer职责数一样,程序猿能够透过兑现自定义的Partitioner调节输出的笔录由哪些Reducer管理,暗中同意使用的是HashPartitioner。程序猿还能通过Job.setCombinerClass(Class)内定八个combiner来实施中间输出的本土聚合,那有助于减少Mapper到Reducer的数码传输。Mapper的中级输出经过排序后连连保存为(key-len,
key,value-len,
value)的格式,应用程序能够通过Configuration调整是不是将中间输出举办削减,以及利用何种压缩格局,相关的多少个参数有:mapreduce.map.output.compress、mapreduce.map.output.compress.codec。技术员通过Job.setMapperClass(Class)将Mapper传递给Job,MapReduce框架调用Mapper的map(WritableComparable,
Writable,
Context)处理该职责的市场总值对,应用程序能够覆盖cleanup(Context)方法达成其余供给的清理工科作。

MapReduce框架为各种由作业的InputFormat生成的InputSplit运行三个map任务,因而总的map任务数量由输入数据大小决定,更标准说是由输入文件总的块数决定。即使可以为较少使用CPU的map职务在节点上设置300个map职分,但各种节点更合乎并行运转十-一百个map任务。由于任务的起步须求开支一些时光,所以职分的运作最佳至少需求一分钟,因为只要职分运维的时光很少,整个作业的年华将大多消耗在职务的创设地方。

Reducer将持有相同键的壹组中间值下降为壹组越来越小数目的值,比方合并单词的数码等。多个学业运维的Reducer数量能够经过Job.setNumReduceTasks(int)也许mapred-site.xml中的参数mapreduce.job.reduces设置,不过更推荐前者,因为能够由技术员决定运行多少个reducer,而后人越多的是提供了壹种暗中认可值。程序员使用Job.setReducerClass(Class)将Reducer提交给作业,MapReduce框架为每对<key,
(list of values)>调用reduce(WritableComparable,
Iterable<Writable>,
Context)方法,同Mapper一样,程序猿也得以覆盖cleanup(Context)方法钦点需求的清理专门的学问。

Reducer的管理进度首要包蕴三个级次:shuffle(洗牌)、sort(分类)和reduce。在shuffle阶段,MapReduce框架通过HTTP获取具备Mapper输出的有关分区。在Sort阶段,框架依照键分组Reducer的输入(差别的mapper大概输出一样的键)。Shuffle和sort是同时举行的,获取Mapper的出口后然后联合它们。在reduce阶段,调用reduce(WritableComparable,
Iterable<Writable>管理<key, (list of
values)>对。Reducer的出口平时经过Context.write(WritableComparable,Writable)写入文件系统,比方HDFS,当然也得以经过应用DBOutputFormat将出口写入数据库。Reducer的输出是未经排序的。

假设不要求Reducer,能够利用Job.setNumReduceTasks(int)将Reducer的多寡设置为0(假如不选取该格局设置Reducer的数据,由于mapreduce.job.reduces默感到一,会运行多个Reducer),在那种地方下,Mapper的输出将直接写入FileOutputFormat.setOutputPath(Job,Path)钦定的不2诀要中,并且MapReduce框架不会对Mapper的输出进行排序。

假诺在进行reduce在此以前想采用与分组中间键时差别的可比规则,能够通过Job.setSortComparatorClass(Class)钦点不一致的Comparator。也正是Job.setGroupingComparatorClass(Class)调控了怎么着对中间输出分组,而Job.setSortComparatorClass(Class)调整了在将数据传入reduce在此之前开展的第叁回分组。

分歧于Mapper的数量由输入文件的尺寸鲜明,Reducer的数目可以由程序猿显明设置,那么设置有些Reducer能够高达较好地成效啊?Reducer的数额限制为:(0.九伍
~1.75 ) * 节点数量 *
每一种节点上最大的容器数。参数yarn.scheduler.minimum-allocation-mb设置了各类容器可伸手的小不点儿内部存款和储蓄器,那么最大容器数可依附总的内存除以该参数总计得出。当使用0.75时,全体的Reducer会被随即加载,并当Mapper落成时初阶传输Mapper的输出。使用一.7伍时,非常快的节点将不负众望它们第2轮的职务,然后加载第1波义务,那样对负载平衡具备更加好的魔法。扩展Reducer的数目固然增添了框架费用,但扩张了负载平衡和下跌了惜败的基金。上边的比重因子比总的Reducer数量稍微少许,感到预测推行的天职和倒闭的天职保留少些的Reducer槽,也等于事实上的Reducer数量为地点公式得出的数据增进保留的Reducer数量。

CentOS安装和布署Hadoop2.2.0 

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 1二.十 +Hadoop 1.二.一版本集群配置

Ubuntu上搭建Hadoop景况(单机格局+伪分布形式)

Ubuntu下Hadoop情状的布局

单机版搭建Hadoop境遇图像和文字教程详解

搭建Hadoop情形(在Winodws景况下用编造机虚拟四个Ubuntu系统举行搭建)

MapReduce允许程序猿能够轻巧地编写并行运转在广大集群上管理多量数据的主次,确认保障程序的运行稳固可信和…

思索难点

MapReduce允许技士能够轻易地编写并行运行在广大集群上拍卖大量数目标次第,确认保障程序的周转稳固可相信和持有容错管理技艺。程序员编写的运转在MapReduce上的应用程序称为作业(job),Hadoop既扶助用Java编写的job,也支撑任何语言编写的学业,比如Hadoop
Streaming(shell、python)和Hadoop
Pipes(c++)。Hadoop-二.X不再保留Hadoop-壹.X版本中的JobTracker和TaskTracker组件,但那并不代表Hadoop-2.X不再帮助MapReduce作业,相反Hadoop-二.X通过唯壹的主ResourceManager、各个节点四个的从NodeManager和各样应用程序四个的MRAppMaster保留了对MapReduce作业的向后格外。在新本子中MapReduce作业照旧由Map和Reduce职务组成,Map如故接收由MapReduce框架将输入数据分割为数据块,然后Map职务以完全并行的点子管理那么些数据块,接着MapReduce框架对Map义务的输出进行排序,并将结果做为Reduce职责的输入,最终由Reduce职责输出最后的结果,在任何推行进度中MapReduce框架肩负义务的调节,监控和再度实践倒闭的职分等。

               
依然那句话,看别人写的的连天认为心累,代码一贴,一打包,扔到Hadoop上跑三遍就完了了????写个测试样例程序(MapReduce中的Hello
World)还要那样麻烦!!!?,还本地打Jar包,传到Linux上,最终再用jar命令运转jar包敲三回in和out参数,作者去,小编是不堪了,小编很捉急,澳门金沙国际 1

MapReduce总结

习感到常总结节点和积攒节点是千篇一律的,MapReduce框架会有效地将任务布置在蕴藏数据的节点上,有助于降低传输数据时的带宽使用量。MapReduce应用程序通过完毕或许连续合适的接口或类提供了map和reduce函数,这四个函数担负Map职务和Reduce任务。作业客户端将编制好的作业提交给ResourceManager,而不再是JobTracker,ResourceManager负担将作业分布到从节点上,调整和监察作业,为作业客户端提供意况和检查判断信息。

                我就想清楚MapReduce的劳作规律,而知道原理后,作者就想在本地用Java程序跑二回全数MapReduce的计量进度,那一个很难啊?
搜遍全网,没察觉多少个是和煦想要的(也有非常大恐怕漏掉了),都以能够参见的,但是零零散散不对胃口,不切合入门级“游戏者”像自己同1的人,最终,下定狠心,看摄像搜罗素材,从头到尾的捋一下MapReduce的规律,以及怎么着在本地,通过编写制定map和reduce函数对三个文书文件中的单词进行出现次数的总结,并将结果输出到HDFS文件系统上

MapReduce

  1. MapReduce的定义
    MapReduce是壹种编制程序模型,
    用于周围数据集(大于一TB)的竞相运算。它将分布式磁盘读写的标题开始展览抽象,并转移为对一个数据集(由键/值对组合)的总结,该总计由
    mapreduce 两某个组成。

  2. MapReduce编程模型流行的四个本领上边的缘故:

  • MapReduce接纳无共享大规模集群系统。集群系统具备优良的性价比和可伸缩性。
  • MapReduce模型简单、易于明白、易于使用。它不只用于拍卖大规模数据,而且能够将过多累赘的底细隐藏起来(比方,自动化并行、负载均衡和灾备管理等),一点都不小地简化了技士的支出职业,而且多量数码管理难题,包罗不少机器学习和数据开采算法,都能够应用MapReduce完毕。
  • 就算如此基本的MapReduce模型只提供1个进度性的编制程序接口,但在海量数据情况、要求保险可伸缩性的前提下,通过应用方便的询问优化和目录手艺,MapReduce还是可以够提供很好的数额管理品质。
  1. MapReduce和关系型数据库的比较
MapReduce和关系型数据库之间的另一个区别在于他们所操作的数据集的结构化的程度,这在第一篇的文章已经讨论过。  
需要强调的是,MapReduce输入的键和值并不是数据固有的属性,而是由分析数据的人员来选择的。
  1. MapReduce的特点:
  • 软件框架
  • 并行管理
  • 保险且容错
  • 科学普及集群
  • 海量数据集
  1. Map 和 Reduce
    Hadoop框架使用Mapper将数据处理成三个个的<key,value>键值对,在网络节点间对其开始展览重新整建(shuffle),然后利用Reducer管理多少并拓展末段输出。
  • Map
    映射的意趣,简单的说正是把3个输入映射为1组(多个)全新的数额,而不去更改原有的多少。
    Mapper担负的是。把复杂的天任务解为多少个“简单的任务”来管理。“轻松的职分”包蕴三层意思:

    1. 多少或总结的框框相对原职务要大大压缩
    2. 左右总括标准,任务会分配到存放着所需数据的节点上举行测算
    3. 这么些小任务可以并行总计相互间大约从未正视关系
  • Reduce
    化简的情趣,正是把通过Map获得的1组数据通过有些方法(化简)归百分之十想要的输出值。

    所以说,MapReduce的想想正是:分而治之

MapReduce框架只管理<key,
value>键值对,也正是将作业的输入视为一些键值对并出口键值对。做为键值的类必须能够被MapReduce框架体系化,因而要求达成Writable接口,常用的IntWritable,LongWritable和Text都以实现该接口的类。做为键的类除了这些之外要兑现Writable接口外,还亟需贯彻WritableComparable接口,落成该接口主要为了推动排序,上边提到的八个类也都得以完结了该接口。

 

MapReduce的劳作体制

MapReduce运维图解一

MapReduce运维图解二

下边,大家来分析MapReduce作业的运转机制

  1. 作业的提交:客户端通过JobClient.runJob()来交付多少个学业到jobtracker,JobClient程序逻辑如下:
  • 向Jobtracker请求3个新的job id (JobTracker.getNewJobId());
  • 自己商讨作业的出口表达,如已存在抛错误给客户端;计算作业的输入分片;
  • 将运行作业所必要的财富(包含作业jar文件,配置文件和计算机才能钻探所得的输入分片)复制到jobtracker的文件系统中以jobid命名的目录下。作业jar别本较多(mapred.submit.replication
    = 10);
  • 告知jobtracker作业计划推行 (submit job)。

简短的话呢,就是

  • 透过JobClient提交,与JobTracker通讯获得一个jar的积攒路线和JobId
  • 自己商讨输入输出的路径
  • 估测计算分片的新闻
  • 将作于所急需的能源(jar,配置文件,计算所得的输入分片)赋值到以作业ID命名的HDFS上
  • 报告JobTracker作业希图执行
  1. 作业的初叶化
  • job
    tracker接收到对其submitJob()方法的调用后,将其放入当中队列,交由job
    scheduler进行调治,并对其进行初叶化,包涵创立2个正值运维作业的目的(封装任务和著录音讯)。
  • 为了创建使时局转列表,job
    scheduler首先从共享文件系统中收获JobClient已统计好的输入分片消息,然后为种种分片创建2个map任务;成立的reduce职分数量由JobConf的mapred.reduce.task属性决定,schedule成立相应数据的reduce职务。职分此时被钦定ID。

简轻松单的话呢,便是

  • JobTracker配置好Job必要的财富后,JobTracker就会开始化作业
  • 初步化首要做的是将Job放入1个里面的行列,让配置好的课业调节器能调治到这几个作业,作业调治器会初叶化那个job。
  • 开始化正是创办八个正在运作的job对象(封装任务和记录音信),以便JobTracker追踪job的事态和进度。
  • 伊始化实现后,作业调节器会博得输入分片消息(input
    split),每一个分片创造2个map职务。
  1. 任务的分红
  • jobtacker应该先选取哪个job来运转?那几个由job scheduler来决定
  • jobtracker如何选择tasktracker来运维选中作业的天职吗?看上边
  • 各类tasktracker定时发送心跳给jobtracker,告知自个儿还活着,是还是不是还不错新的职务。
    jobtracker以此来调控将任务分配给何人(还是使用心跳的重临值与tasktracker通讯)。
    各种tasktracker会有定位数量的职责槽来管理map和reduce(举例二,表示tasktracker能够而且运维七个map和reduce),由机械内核的数额和内部存款和储蓄器大小来决定。job
    tracker会先将tasktracker的map槽填满,然后分配reduce任务到tasktracker。
  • jobtracker选拔哪位tasktracker来运营map任务急需考虑互联网地方,它会选拔一个离输入分片较近的tasktracker,优先级是数量当地化(data-local)–>机架本地化(rack-local)。
  • 对此reduce职务,未有怎么正儿捌经来抉择哪个tasktracker,因为无法思考数据的本地化。map的出口始终是急需经过整治(切分排序合并)后通过网络传输到reduce的,只怕四个map的出口会切分出某些送给二个reduce,所以reduce职责大可不必选取和map同样或目前的机械上。
  1. 义务的施行
  • tasktracker分配到一个任务后,首先从HDFS中把作业的jar文件复制到tasktracker所在的地头文件系统(jar本地化用来运营JVM)。同时将应用程序所供给的漫天文件从布满式缓存复制到本地磁盘。
  • 接下去tasktracker为任务新建一个本地下工作作目录,并把jar文件的始末解压到那么些文件夹下。
  • tasktracker新建四个taskRunner实例来运作该职责。
    TaskRunner运行七个新的JVM来运行每种义务,以便客户的map/reduce不会潜移默化tasktracker守护进程。但在区别义务之间重用JVM依旧也许的。
    子进度经过umbilical接口与父进度打开通讯。任务的子进度每隔几秒便告知父进度的快慢,直到任务成功。
  1. 进度和气象的立异
  • 叁个功课和每一个职分都有三个场所音讯,包涵:作业或职务的运作情形(running,
    successful, failed),map和reduce的速度,计数器值,状态信息或描述。
  • task会定时向tasktracker汇报执市场价格况,tasktracker会按时收集所在集群上的有着task消息,并想JobTracker汇报.JobTracker会依据全部tasktracker汇报上来的音信实行聚焦。
  • 这一个音讯通过一定的岁月间隔由child JVM –> task tracker –> job
    tracker集聚。job
    tracker将产生3个评释全部运转作业及其职分意况的全局视图。你能够透过Web
    UI查看。同时JobClient通过每秒查询jobtracker来得到新型景况。
  1. 学业的做到
  • JobTracker是在抽出到最终3个任务到位后,才将义务标识为”成功”。并将数据结果写入到HDFS上。
  1. 作业的败诉
  • JobTracker失利:JobTracker失利那是Infiniti惨重的1种职分战败,战败机制–它是二个单节点故障,因而,作业注定退步。(hadoop2.0缓慢解决了)。
  • tasktracker失败:tasktracker退步崩溃了会告一段落向jobt发送心跳消息,并且JobTracker会将tasktracker从等待的天职池中移除,将该职分转移到别的的地方实践.obTracker会将tasktracker插足到黑名单。
  • task退步:map或reduce运转退步,会向tasktracker抛出尤其,职分挂起.

在简要介绍了MapReduce框架后,下边深切学习框架中的七个首要概念:Mapper和Reducer,正如上文提到了,它们构成了MapReduce作业并担任完结实际的事情逻辑处理。

               注:
本文最后,还会附着壹套自身写的HDFS的公文操作Java
API,使用起来也很便利,API还在频频的完善..

MapReduce职业涉及到的多少个目的

  1. 客户端(client):编写mapreduce程序,配置作业,提交作业,那正是程序猿实现的办事。
  2. JobTracker:起始化作业,分配作业,与TaskTracker通讯,协调治个作业的试行。
  3. TaskTracker:保持与JobTracker的通讯,在分配的数码片段上实行Map或Reduce职务,TaskTracker和JobTracker的不一致有个很主要的方面,正是在实施职责时候TaskTracker能够有n四个,JobTracker则只会有一个(JobTracker只好有二个就和hdfs里namenode同样存在单点故障,小编会在背后的mapreduce的连锁主题材料里讲到那些主题素材的)。
  4. HDFS:保存作业的多寡、配置消息等等,最终的结果也是保存在hdfs上边
    <small>jobtracker的单点故障:
    jobtracker和hdfs的namenode一样也设有单点故障,
    单点故障一向是hadoop被人非议的大标题,
    何以hadoop的做的文件系统和mapreduce总计框架都以高容错的,可是最要害的管理节点的故障机制却如此不好,小编觉着主即便namenode和jobtracker在其实运营中都以在内部存款和储蓄器操作,而成功内部存款和储蓄器的容错就相比较复杂了,只有当内部存款和储蓄器数据被持久化后容错才好做,namenode和jobtracker都能够备份自个儿持久化的公文,不过那个持久化都会有延迟,因而真正出故障,任然不能够完全回复,其它hadoop框架里富含zookeeper框架,zookeeper能够整合jobtracker,用几台机器同时配备jobtracker,保障①台出故障,有壹台即刻能互补上,可是那种办法也无奈恢复生机正在跑的mapreduce职务。
    </small>

Mapper是单身的天职,将输入记录调换为中等记录,即对输入的键值对张开处理,并出口为一组中间键值对,输出的键值对使用context.write(WritableComparable,
Writable)方法搜罗起来,中间记录的键值类型不必与输入记录的键值类型一样,实际上也往往是见仁见智的。一条输入记录经由Mapper管理后或者输出为0条要么多条在那之中记录。比如,假设输入记录不满足职业须要(未有包蕴特定的值或许隐含了一定的值)的话,能够平素回到,则会输出0条记下,此时Mapper起了过滤器的作用。

 

MapRedece作业的管理流程

MapRedece作业的管理流程图解

遵照时间各类包含:

  • 输入分片(input split)
  • map阶段
  • combiner阶段
  • shuffle阶段和
  • reduce阶段。
  1. 输入分片(input split):
    在张开map计算在此以前,mapreduce会依据输入文件总括输入分片(input
    split),各种输入分片(input split)针对多个map职责
    输入分片(input
    split)存款和储蓄的绝不数据自己,而是3个分片长度和1个笔录数据的地方的数组,输入分片(input
    split)往往和hdfs的block(块)关系很密切
    <small>只要大家设定hdfs的块的分寸是6肆mb,假如大家输入有四个文本,大小分别是三mb、陆五mb和127mb,那么mapreduce会把三mb文件分为一个输入分片(input
    split),65mb则是多少个输入分片(input
    split)而1贰柒mb也是四个输入分片(input split)
    即大家只要在map总括前做输入分片调节,比方合并小文件,那么就会有两个map职分将施行,而且每一个map实践的多寡大小不均,那么些也是mapreduce优化计算的2个关键点。

    地点提交MapReduce作业至集群,大数额学习day_五。</small>
  2. Map阶段:
    程序猿编写好的map函数了,由此map函数功效相对好调节,而且貌似map操作都以本地化操作也正是在数额存款和储蓄节点上开展。
  3. Combiner阶段:
    combiner阶段是程序猿能够选用的,combiner其实也是一种reduce操作。
    Combiner是三个本地化的reduce操作,它是map运算的继续操作,主要是在map总结出中间文件前做3个回顾的联结重复key值的操作。
    <small>举例我们对文本里的单词频率做总计,map计算时候若是遇上贰个hadoop的单词就会记录为一,可是这篇小说里hadoop恐怕会师世n数次,那么map输出文件冗余就会过多,因而在reduce总结前对同一的key做3个合并操作,那么文件会变小,那样就拉长了宽带的传输效能,毕竟hadoop计算力宽带能源往往是总计的瓶颈也是最为高贵的财富,但是combiner操作是有高风险的,使用它的尺码是combiner的输入不会影响到reduce计算的末段输入,
    诸如:如果总结只是求总量,最大值,最小值能够行使combiner,可是做平均值计算使用combiner的话,最后的reduce总括结果就会出错。
    </small>
  4. shuffle阶段:
    将map的出口作为reduce的输入的进度正是shuffle了。
  5. reduce阶段:
    和map函数同样也是程序员编写的,最后结出是储存在hdfs上的。

随即MapReduce框架将与给定键相关联的保有中等值分组,然后传递给Reducer。用户能够透过Job.setGroupingComparatorClass(Class)方法钦点Comparator来支配分组。Mapper的输出被排序然后根据Reducer分区,总的分区数与作业运营的Reducer职务数同样,程序员能够透过完成自定义的Partitioner调整输出的笔录由哪位Reducer管理,暗中同意使用的是HashPartitioner。技士还能够通过Job.setCombinerClass(Class)内定一个combiner来推行中间输出的地点聚合,那促进收缩Mapper到Reducer的多寡传输。Mapper的高中级输出经过排序后总是保存为(key-len,
key,value-len,
value)的格式,应用程序能够通过Configuration调整是还是不是将中间输出进行削减,以及选用何种压缩方式,相关的多少个参数有:mapreduce.map.output.compress、mapreduce.map.output.compress.codec。程序猿通过Job.setMapperClass(Class)将Mapper传递给Job,MapReduce框架调用Mapper的map(WritableComparable,
Writable,
Context)管理该职责的股票总市值对,应用程序能够覆盖cleanup(Context)方法达成其余索要的清理职业。


Combiner深远通晓

MapReduce流程

在上述进度中,大家看来至少四个天性瓶颈:

(一)假诺大家有10亿个数据,Mapper会生成十亿个键值对在互连网间开始展览传输,但只要大家只是对数据求最大值,那么很醒目的Mapper只需求输出它所知道的最大值就能够。那样做不仅可以缓慢消除网络压力,一样也足以大幅提升程序功能。
  总结:互联网带宽严重被占降低程序效用

(2)若是使用美利坚协作国专利数量集中的国家一项来解说数据倾斜这一个概念,那样的数额远远不是1致性的也许说平衡遍及的,由于超过半数专利的国度都属于美利哥,那样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等等,大许多的键值对终极集聚焦于3个10足的Reducer之上,总括任务分配不够均衡,从而大大下落程序的性质。
  总结:单一节点承载过重下落程序品质

在MapReduce编制程序模型中,在Mapper和Reducer之间有贰个特出关键的零部件,它消除了上述的品质瓶颈难点,它正是Combiner

一与mapper和reducer差别的是,combiner未有私下认可的完结,必要显式的安装在conf中才有机能。
2并不是具备的job都适用combiner,唯有操作满意结合律的才可设置combiner。
combine操作看似于:opt(opt(1, 二, 三), opt(肆, 五,
六))。假若opt为求和、求最大值的话,能够使用,可是即便是求平均值的话,则不适用。

因为每1个map都可能会时有发生多量的当地输出,Combiner的作用正是对map端的输出先做3次联合,以调整和减弱在map和reduce节点之间的数目传输量,以加强互连网IO质量。是MapReduce的1种优化花招。

Combiner总结:
在骨子里的Hadoop集群操作中,大家是由多台主机一同打开MapReduce的,
假设进入规约(Combiner)操作,每一台主机会在reduce此前开始展览一回对本机数据的规则,
下一场在经过集群开始展览reduce操作,那样就会大大节约reduce的年月,
就此加快MapReduce的管理速度

MapReduce框架为各类由作业的InputFormat生成的InputSplit运行一个map职分,因而总的map职务数量由输入数据大小决定,更加纯粹说是由输入文件总的块数决定。纵然可认为较少使用CPU的map任务在节点上设置300个map职分,但每一个节点更契合并行运转10-九十四个map职务。由于任务的启航须要费用一些时日,所以职责的运维最棒至少供给一秒钟,因为要是使时局营的日子很少,整个作业的光阴将大多消耗在职务的确立地方。

 

Partitioner理解

Map阶段

<small>step一.叁就是二个分区操作。通过前边的上学大家清楚Mapper最后管理的键值对<key,
value>,是急需送到Reducer去联合的,合并的时候,有同等key的键/值对会送到同多个Reducer节点中进行归并。哪个key到哪个Reducer的分红进程,是由Partitioner规定的。在一些集群应用中,比如分布式缓存集群中,缓存的数量差不离都以靠哈希函数来打开数量的均匀遍布的,在Hadoop中也不例外。
</small>

MapReduce的使用者平常会钦定Reduce职责和Reduce职分输出文件的数量(帕杰罗)。
用户在中游key上运用分区函数来对数码实行分区,之后在输入到持续职责实践进度。3个默许的分区函数式使用hash方法(比方大规模的:hash(key)
mod 路虎极光)实行分区。hash方法能够爆发至极平衡的分区。
默许的分区函数HashPartitioner

Partitioner小结:分区Partitioner首要成效在于以下两点

  • 基于职业要求,爆发八个出口文件
  • 五个reduce任务并发运行,升高全部job的运作功用

Reducer将全数一样键的一组中间值下落为一组越来越小数码的值,比方合并单词的数码等。二个作业运营的Reducer数量能够通过Job.setNumReduceTasks(int)可能mapred-site.xml中的参数mapreduce.job.reduces设置,可是更推荐前者,因为可以由程序员决定运行多少个reducer,而后者更加多的是提供了一种默许值。程序猿使用Job.setReducerClass(Class)将Reducer提交给作业,MapReduce框架为每对<key,
(list of values)>调用reduce(WritableComparable,
Iterable<Writable>,
Context)方法,同Mapper一样,技术员也能够覆盖cleanup(Context)方法钦点供给的清管事人业。

 

Shuffle理解

Reduce阶段

Shuffle是什么
本着四个map职分的输出依据分裂的分区(Partition)通过互连网复制到差别的reduce任务节点上,那些进度就称作为Shuffle。

Hadoop的shuffle进程纵然从map端输出到reduce端输入之间的经过,这1段应该是Hadoop中最中央的1对,因为关乎到Hadoop中最弥足尊敬的互联网能源,所以shuffle进程中会有多数可以调理的参数,也有广大战略能够商量

map进程的出口是写入本地球磁性盘而不是HDFS,可是一起初数据并不是一贯写入磁盘而是缓冲在内部存储器中,缓存的益处便是压缩磁盘I/O的支出,提升合并和排序的快慢。又因为默许的内部存款和储蓄器缓冲大小是十0M(当然那些是足以安插的),所以在编写制定map函数的时候要尽量缩短内部存款和储蓄器的选拔,为shuffle进度预留越来越多的内部存款和储蓄器,因为该进程是最耗费时间的进度

再来详细梳理下全部工艺流程

Map端

  1. 在map端首先是InputSplit,在InputSplit中富含DataNode中的数据,每种InputSplit都会分配贰个Mapper职责,Mapper任务实现后发生<K2,V贰>的输出,那一个输出先存放在缓存中,每种map有四个环形内部存款和储蓄器缓冲区,用于存款和储蓄任务的输出。私下认可大小100MB(io.sort.mb属性),1旦达到规定的规范阀值0.八(io.sort.spil
    l.percent),二个后台线程就把内容写到(spill)Linux本地球磁性盘中的内定目录(mapred.local.dir)下的新建的二个溢出写文件。
  2. 写磁盘前,要拓展partition、sort和combine等操作。通过分区,将不相同类型的数额分开管理,之后对两样分区的多少开始展览排序,如若有Combiner,还要对排序后的多寡进行combine。等结尾记录写完,将全方位溢出文件合并为一个分区且排序的文本。
  3. 说起底将磁盘中的数据送到Reduce中,图中Map输出有几个分区,有多少个分区数据被送到图示的Reduce职务中,剩下的四个分区被送到别的Reducer职责中。而图示的Reducer职分的此外的四个输入则来自其余节点的Map输出。

Reduce端

  1. Copy阶段:Reducer通过Http格局获取输出文件的分区。
      reduce端可能从n个map的结果中获取数据,而这么些map的施行进程不尽同样,当在那之中一个map运转甘休时,reduce就会从JobTracker中获得该音讯。map运转甘休后TaskTracker会获得信息,进而将新闻汇报给JobTracker,reduce定期从JobTracker获取该消息,reduce端私下认可有6个数据复制线程从map端复制数据
  2. Merge阶段:假如形成七个磁盘文件会张开合并。
      从map端复制来的数据首先写到reduce端的缓存中,一样缓存占用达到一定阈值后会将数据写到磁盘中,同样会开始展览partition、combine、排序等经过。假设产生了多个磁盘文件还会举办合并,最后一遍联合的结果作为reduce的输入而不是写入到磁盘中
    3.Reducer的参数:最终将联合后的结果作为输入传入Reduce职务中

小结:当Reducer的输入文件规定后,整个Shuffle操作才最后甘休。之后就是Reducer的进行了,最终Reducer会把结果存到HDFS上。

Reducer的管理进度首要归纳四个阶段:shuffle(洗牌)、sort(分类)和reduce。在shuffle阶段,MapReduce框架通过HTTP获取具备Mapper输出的相关分区。在Sort阶段,框架依据键分组Reducer的输入(不相同的mapper大概输出一样的键)。Shuffle和sort是同时举行的,获取Mapper的出口后然后联合它们。在reduce阶段,调用reduce(WritableComparable,
Iterable<Writable>处理<key, (list of
values)>对。Reducer的出口经常通过Context.write(WritableComparable,Writable)写入文件系统,举例HDFS,当然也足以经过行使DBOutputFormat将出口写入数据库。Reducer的输出是未经排序的。

效果:

Hadoop的压缩

Shuffle进程中看看,map端在写磁盘的时候使用压缩的艺术将map的出口结果开始展览削减是三个滑坡互联网开辟很得力的形式

在Java中装置输出压缩

reduce端输出压缩使用了Codec中的Gzip算法,也得以接纳bzip二算法

固然不需求Reducer,能够应用Job.setNumReduceTasks(int)将Reducer的多少设置为0(假设不利用该措施设置Reducer的多寡,由于mapreduce.job.reduces默感觉一,会运营1个Reducer),在那种场所下,Mapper的出口将直接写入FileOutputFormat.setOutput帕特h(Job,Path)钦点的不2秘技中,并且MapReduce框架不会对Mapper的出口举行排序。

 

MapReduce排序分组

排序: 在Hadoop默许的排序算法中,只会针对key值进行排序

自定义排序:

public interface WritableComparable<T> extends Writable, Comparable<T> {
}

自定义类型MyNewKey达成了WritableComparable的接口,
该接口中有三个compareTo()形式,当对key进行相比时会调用该方式,而大家将其改为了我们温馨定义的可比规则,从而达成大家想要的作用

分组:在Hadoop中的默许分组规则中,也是基于Key实行的,会将壹律key的value放到二个聚集中去

自定义分组:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}

自定义了2个分组相比器MyGroupingComparator,该类达成了RawComparator接口,而RawComparator接口又落成了Comparator接口,那四个接口的概念:

若是在张开reduce从前想采用与分组中间键时分化的可比规则,能够通过Job.setSortComparatorClass(Class)钦命不相同的Comparator。也正是Job.setGroupingComparatorClass(Class)调节了哪些对中级输出分组,而Job.setSortComparatorClass(Class)调控了在将数据传入reduce以前开始展览的第二次分组。

1、  HelloWorld文本

Hadoop数据类型

Hadoop
MapReduce操作的是键值对,但那个键值对并不是Integer、String等职业的Java类型。为了让键值对能够在集群上移动,Hadoop提供了壹部分兑现了WritableComparable接口的基本数据类型,以便用这么些类型定义的多少能够被体系化进行互联网传输、文件存款和储蓄与大小比较。

  • 值:仅会被轻便的传递,必须达成Writable或WritableComparable接口。
  • 键:在Reduce阶段排序时须要开始展览相比较,故只好促成WritableComparable接口。
描述
BooleanWritable 标准布尔变量的封装
ByteWritable 单字节数的封装
DoubleWritable 双字节数的封装
FloatWritable 浮点数的封装
IntWritable 整数的封装
LongWritable Long的封装
NullWritable 无键值时的占位符
Text 使用UTF-8格式的文本封装

分裂于Mapper的数额由输入文件的大大小小鲜明,Reducer的多少能够由技师显著设置,那么设置有个别Reducer能够直达较好地功用呢?Reducer的数据限制为:(0.九伍
~1.75 ) * 节点数量 *
每一个节点上最大的器皿数。参数yarn.scheduler.minimum-allocation-mb设置了各样容器可伸手的纤维内存,那么最大容器数可依附总的内部存款和储蓄器除以该参数总括得出。当使用0.75时,全数的Reducer会被立刻加载,并当Mapper达成时初阶传输Mapper的输出。使用壹.7伍时,相当的慢的节点将成功它们第二轮的职责,然后加载第1波职责,那样对负载平衡具备更加好的功效。扩充Reducer的多少即便扩充了框架费用,但扩大了负载平衡和滑降了输球的本钱。下边包车型大巴百分比因子比总的Reducer数量稍微少量,认为预测试行的天职和挫败的天职保留少许的Reducer槽,也正是实际上的Reducer数量为地点公式得出的数码拉长保留的Reducer数量。

 

CentOS设置和布署Hadoop2.2.0 
http://www.linuxidc.com/Linux/2014-01/94685.htm

a b c d
e f a c
a c b f

Ubuntu
13.04上搭建Hadoop环境
http://www.linuxidc.com/Linux/2013-06/86106.htm

 

Ubuntu 12.10 +Hadoop 一.2.1版本集群配置
http://www.linuxidc.com/Linux/2013-09/90600.htm

 

Ubuntu上搭建Hadoop情况(单机情势+伪布满形式)
http://www.linuxidc.com/Linux/2013-01/77681.htm

澳门金沙国际 2

Ubuntu下Hadoop情形的配置
http://www.linuxidc.com/Linux/2012-11/74539.htm

 

单机版搭建Hadoop情形图文教程详解
http://www.linuxidc.com/Linux/2012-02/53927.htm

 

搭建Hadoop情形(在Winodws情况下用编造机虚拟多个Ubuntu系统开始展览搭建)
http://www.linuxidc.com/Linux/2011-12/48894.htm

 

越来越多Hadoop相关消息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13


本文恒久更新链接地址:http://www.linuxidc.com/Linux/2014-11/109286.htm

 

澳门金沙国际 3

 

 

2、上传至HDFS的intput目录下

 

 

澳门金沙国际 4

 

 

 


 

 

三、客户端提交Job,实行MapReduce

 

 

A、map的出口结果

 

 

澳门金沙国际 5

 

 

 


B、reduce的输入结果(上一步map的输出)

 

 

澳门金沙国际 6

 

 

 


 

 

C、reduce的妄想结果(代码完结细节先一时忽略,文章中会讲到)

 

 

 澳门金沙国际 7

 

 

 


 

 

D、最终,待义务总体成功,交由HDFS实行结果的文书写入

 

 

澳门金沙国际 8

 

 

澳门金沙国际 9

 

 


 

 

澳门金沙国际 10

 

 

 

MapReduce中的分区暗中认可是哈希分区,可是大家也能够友善写demo来重写Partitioner类的getPartiton方法,如下:

 

 

 

澳门金沙国际 11

 

 

分区规则定后,大家须求内定客户端Job的map
task的分区类并安装reducer的个数,如下

 

 

澳门金沙国际 12

 

 

 

终极,提交Job跑一次MapReduce的意义如下:

 

 

澳门金沙国际 13

 

 

 

咱俩独家下载文件*.*-00000和*.*-0000壹至地点,并张开结果印证,效果如下:

 

 

分区0对应的reduce结果文件如下:

 

 

澳门金沙国际 14

 

 

 

分区1对应的reduce结果文件如下:

 

 

 

澳门金沙国际 15

 

 

 

迄今停止,小编用效应图的不二等秘书籍,给大家演示了刹那间MapReduce的前前后后到底是哪些进展map和reduce的,在那之中产生了怎么,最终又生出了何等,很直观,但是,注意,效果图只好救助您领悟MapReduce最后能干什么,具体MapReduce内部的行事规律如何,作者上面会持续讲到,而且,博文的尾声,小编会附上本篇博文演示要用到的全demo。

 

 

 


 

 

 

 

一、什么是MapReduce

 

 

我们要数教室里面包车型客车有着书,你数一号书架,小编数2号书架,他数三号书架…那就叫Map

目前大家把全部人计算的书本数加在(归并)一同,那就叫“Reduce”

合起来就是MapReduce【布满式大数目管理成效】!!!

详尽的请查看:MapReduce是1种编制程序模型,用于大规模数据集(大于一TB)的互动运算

 

 

 

贰、MapReduce试行进度

 

 

 

 

澳门金沙国际 16

 

补充:

 

1、JobTracker  对应于
NameNode,TaskTracker 对应于 DataNode。

贰、JobTracker是1个master服务,软件运营以往JobTracker接收Job,担负调节Job的每叁个子职务task运营于TaskTracker上,并监察和控制它们,假诺开采有退步的task就再一次运转它。一般情状应该把JobTracker布署在单独的机器上。

 

 

Client        
:客户端提交三个Job给JobTracker

JobTracker
:调整Job的每一个mapper和reducer,并运转在TaskTracker上,并监察和控制它们

Mapper    
 :获得适合本人的InputSplit(输入内容),进行map后,输出MapOutPut(map的输出内容)

Reducer     :
获得MapOutPut作为友好的ReduceInput,进行reduce总计,最后输出OutPut结果

 

 

 

 

 

叁、MapReduce工作规律

 

 

 

 

 澳门金沙国际 17

 

 

 

注:reducer起头reduce在此之前,一定要等待mapper达成map后才干开端

 

先是步:Job读取存在于
HDFS文件系统上的文件作为Mapper的输入内容(key,value)并经过尤其的管理(map编程达成)交由Mapper
Task

其次步:Mapper
Task对map后的多寡进行shuffle(洗牌,重组),包涵分区、排序或联合(combine)

其三步:Mapper把shuffle后的输出结果(key,values)提供给相应的Reducer
Task,由Reducer取走

第4步:Reducer得到上一步Mapper的出口结果,进行reduce(客户端编制程序落成)

第4步:Reducer实现reduce总结后,将结果写入HDFS文件系统,分裂的reducer对应区别的结果文件

 

 

强行的图(本人画的)如下

 

 

澳门金沙国际 18

 

 

 

 

四、Shuffle进程简单介绍

 

 

 

 

 澳门金沙国际 19

 

 

 

     
上海体育场所要求小心的正是shuffle进程中的分区,图中很直观的认证了,mapper最终会把key-value键值对数码(输入数据)从缓存中拿出(缓存数据溢出)并依赖分区规则举行磁盘文件的写入(注意:那里的磁盘文件不是HDFS文件系统上的公文,且写入文件的内容早已是排序过的了),同时会对分裂分区的key-value数据文件举行1个合并,最终分给差异的Reduce职责张开reduce管理,借使有七个Mapper,则Reducer从Map端获取的剧情须要重新开始展览联合(把属于分裂的Mapper但属于同三个分区的出口的结果开展合并,并在reduce端也实行shuffle进程,写入磁盘文件,最后进行reduce总计,reduce计算的结果最终以文件的花样出口到HDFS文件系统中)

 

 

 

 

 

五、Map端的Shuffle过程

 

 

 

 

 

 澳门金沙国际 20

    

 须要通晓的是:

 

壹、缓存的高低是能够安装的(mapreduce.task.io.sort.mb,暗中认可100M)

2、溢出比(缓存使用率有叁个软阈值
== mapreduce.map.sort.spill.percent,暗中同意0.80),当超过阈值时,溢出作为会在后台起三个线程施行从而使Map职务不会因为缓存的溢出而被打断。但只要达到硬限制,Map职责会被封堵,直到溢骑行为终止

 

 

 

陆、怎么样编写map和reduce函数

 

 

 

 

        到这一步,若是您对MapReduce的行事规律已经调节了,那么接下去,编写客户端程序,利用MapReduce的乘除成效,实现文件文件中单词的产出次数的计算,将会是轻易的。

       

     
 首先,我们必要2个mapper(职责),其次是reducer(职务),有了多个职责后,大家要求创设2个Job(作业),将mapper和reducer关联起来,并交给至Hadoop集群,由集群中的JobTracker实行mapper和reducer职责的调整,并最终成就多少的计量职业。

     
 由此,简单察觉,光有mapper和reducer任务,是心有余而力不足开始展览MapReduce(布满式大数据估测计算)的,那里我们需求写多个类,3个是落成Map的类,3个是兑现Reduce的类,还有三个就是付诸作业的主类(Client
Main Class)

      

     
 由于博主的Hadoop版本是三.壹.0的,因此,为了兼顾三.X以下的Hadoop集群情况能够在上面提供的demo中可见跑起来,特将本文中涉嫌到的Hadoop重视换到了二.七.X的版本,如下:

 

 

 

在意:不要采用老式的hadoop-core(一.二.一)重视,不然谋面世各个意想不到的的主题素材

       

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.1</version>
</dependency>
<!-- common 依赖 tools.jar包 -->
<dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>1.8</version>
    <scope>system</scope>
    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

 

 

 

(1)编写Mapper

 

 

澳门金沙国际 21

 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper 原型 : Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * 
 * KEYIN    : 默认情况下,是mr框架所读到的一行文本内容的起始偏移量,Long,
 *            但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
 * VALUEIN  : 默认情况下,是mr框架所读到的一行文本的内容(Java String 对应 Hadoop中的Text)
 * KEYOUT   : 用户自定义逻辑处理完成之后输出数据中的key,在此处是单词(String),同上用Text
 * VALUEOUT : 用户自定义逻辑处理完成之后输出数据中的value,在这里是单词的次数:Integer,对应Hadoop中的IntWritable
 * 
 * mapper的输入输出参数的类型必须和reducer的一致,且mapper的输出是reducer的输入
 * 
 * @blob   http://blog.csdn.net/appleyk
 * @date   2018年7月3日15:41:13
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{


    /**
     * map实现数据拆分的操作
     * 本操作主要进行Map的数据处理
     * 在Mapper父类里面接受的内容如下:
     * LongWritable:文本内容的起始偏移量
     * Text:每行的文本数据(行内容)
     * Text:每个单词分解后的统计结果
     * IntWritable:输出记录的结果
     */
     @Override
     protected void map(LongWritable key, Text value,Context context)
             throws IOException, InterruptedException {

         System.out.println("文本内容的起始偏移量:"+key);
         String line = value.toString()    ;//取出每行的数据
         String[] result  = line.split(" ");//按照空格进行数据拆分
         //循环单词
         for (int i = 0 ;i <result.length ; i++){

           //针对每一个单词,构造一个key-value
            System.out.println("key-value : <"+new Text(result[i])+","+new IntWritable(1)+">");


           /**
            * 将每个单词的key-value写入到输入输出上下文对象中
            * 并传递给mapper进行shuffle过程,待所有mapper task完成后交由reducer进行对号取走
            */
             context.write(new Text(result[i]), new IntWritable(1));
         }

         /**        map端的shuffle过程(大致简单的描述一下)
          *                       |
          *                       |  放缓存(默认100M,溢出比是0.8,即80M满进行磁盘写入并清空,
          *                       |  剩余20M继续写入缓存,二者结合完美实现边写缓存边溢写(写磁盘))
          *                       V
          *               <b,1>,<c,1>,<a,1>,<a,1>
          *                         
          *                       |
          *                       | 缓存写满了,开始shuffle(洗牌、重组)  == 包括分区,排序,以及可进行自定的合并(combine)
          *                       V     
          * 写入磁盘文件(not hdfs)并进行文件归并,成一个个的大文件 <a,<1,1>>,<b,1>,<c,1>   
          * 
          *                         |
          *                         |
          *                         V
          *   每一个大文件都有各自的分区,有几个分区就对应几个reducer,随后文件被各自的reducer领走
          *   
          *           !!! 这就是所谓的mapper的输入即是reducer的输出 !!!
          */
     }
}

 

 

 

 

 

 

(2)编写Reducer

 

 

 

澳门金沙国际 22

 

  

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 进行合并后数据的最终统计
 * 本次要使用的类型信息如下:
 * Text:Map输出的文本内容
 * IntWritable:Map处理的个数
 * Text:Reduce输出文本
 * IntWritable:Reduce的输出个数
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                    throws IOException, InterruptedException {

    //mapper的输出是reducer的输入,因此,这里打一下reducer的接收内容            
    List<Integer> list = new ArrayList<>();

    int sum = 0;//记录每个单词(key)出现的次数
        for (IntWritable value : values) {
            //从values集合里面取出key的单个频率数量(其实就是1)进行叠加
            int num = value.get();
            sum += num;
            list.add(num);

        }

       /**
    * mapper会把一堆key-value进行shuffle操作,其中涉及分区、排序以及合并(combine)
    * 注:上述shuffle中的的合并(combine)区别于map最终的的合(归)并(merge)
    * 比如有三个键值对:<a,1>,<b,1>,<a,1>
    * combine的结果:<a,2>,<b,1>      == 被reducer取走,数据小
    * merage 的结果;<a,<1,1>>,<b,1>  == 被reducer取走,数据较大(相比较上述combine来说)
    * 注:默认combiner是需要用户自定义进行开启的,所以,最终mapper的输出其实是归并(merage)后的的结果
    * 
    * 所以,下面的打印其实就是想看一下mapper在shuffle这个过程后的merage结果(一堆key-values)
    */
    System.out.println("key-values :<"+key+","+list.toString().replace("[", "<")
                .replace("]", ">")+">");

    //打印一下reduce的结果
    System.out.println("reduce计算结果 == key-value :<"+key+","+new IntWritable(sum)+">");
    //最后写入到输入输出上下文对象里面,传递给reducer进行shuffle,待所有reducer task完成后交由HDFS进行文件写入
    context.write(key, new IntWritable(sum));


    }
}

 

 

(三)编写Partition分区类(假如急需修改Map暗中同意的哈希分区规则的话)

 

 

澳门金沙国际 23

 

 

 

 1 import org.apache.hadoop.io.IntWritable;
 2 import org.apache.hadoop.io.Text;
 3 import org.apache.hadoop.mapreduce.Partitioner;
 4 
 5 public class PartitionTest extends Partitioner<Text, IntWritable> {
 6 
 7     /**
 8      * key          : map的输出key 
 9      * value        : map的输出value 
10      * numReduceTask: reduce的task数量
11      * 返回值,指定reduce,从0开始
12      * 比如,分区0交由reducer0拿走
13      */
14     @Override
15     public int getPartition(Text key, IntWritable value, int numReduceTask) {
16         
17         if (key.toString().equals("a")) {
18             //如果key的值等于a,则将其分区指定为0,对应第一个reducer拿走进行reduce
19             return 0;
20         } else {
21             return 1;
22         }
23     }
24 }

 

 

 

 

(4)编写Job类(Main Class)

 

 

澳门金沙国际 24

 

 

 

 

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Job;
 6 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 7 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 8 
 9 import com.appleyk.hdfs.mapper.WordCountMapper;
10 import com.appleyk.hdfs.part.PartitionTest;
11 import com.appleyk.hdfs.reducer.WordCountReducer;
12 
13 /**
14  * Client端,提交作业
15  * @author yukun24@126.com
16  * @blob   http://blog.csdn.net/appleyk
17  * @date   2018年7月3日-上午9:51:49
18  */
19 public class WordCountApp {
20 
21     public static void main(String[] args) throws Exception{
22         
23         Configuration conf = new Configuration();
24         //配置uri
25         conf.set("fs.defaultFS", "hdfs://192.168.142.138:9000");
26     
27         //创建一个作业,作业名"wordCount",作用在Hadoop集群上(remote)
28         Job job = Job.getInstance(conf, "wordCount");
29         
30         /**
31          * 设置jar包的主类(如果样例demo打成Jar包扔在Linux下跑任务,
32          * 需要指定jar包的Main Class,也就是指定jar包运行的主入口main函数)
33          */
34         job.setJarByClass(WordCountApp.class);
35         
36         //设置Mapper 任务的类(自己写demo实现map)
37         job.setMapperClass(WordCountMapper.class);
38         //设置Reducer任务的类(自己写demo实现reduce)
39         job.setReducerClass(WordCountReducer.class);
40 
41         //指定mapper的分区类
42         //job.setPartitionerClass(PartitionTest.class);
43         
44         //设置reducer(reduce task)的数量(从0开始)
45         //job.setNumReduceTasks(2);
46         
47         
48         //设置映射输出数据的键(key)  类(型)
49         job.setMapOutputKeyClass(Text.class);
50         //设置映射输出数据的值(value)类(型)
51         job.setMapOutputValueClass(IntWritable.class);
52 
53         //设置作业(Job)输出数据的键(key)  类(型)   == 最后要写入到输出文件里面
54         job.setOutputKeyClass(Text.class);
55         //设置作业(Job)输出数据的值(value)类(型)   == 最后要写入到输出文件里面
56         job.setOutputValueClass(IntWritable.class);
57 
58         //设置输入的Path列表(可以是单个文件也可以是多个文件(目录表示即可))
59         FileInputFormat.setInputPaths (job, new Path("hdfs://192.168.142.138:9000/input" ));
60         //设置输出的目录Path(确认输出Path不存在,如存在,请先进行目录删除)
61         FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.142.138:9000/output"));
62 
63         //将作业提交到集群并等待它完成。
64         boolean bb =job.waitForCompletion(true);
65         
66         if (!bb) {
67             System.out.println("Job作业执行失败!");
68         } else {
69             System.out.println("Job作业执行成功!");
70         }
71     }
72 
73 }

 

 

 

 

 

(5)运营main方法,提交作业

 

 

出现相当:

 

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

 

 

 

出于作业是在本地(Windows)跑的,因而,这里遭逢2个本地IO读写权限的标题,具体代码可以见NativeIO这一个Java类的源码,在此处:

 

 

澳门金沙国际 25

 

 

 

 

将源代码全体拷贝出来,在类型下新建一个同包名同类名称的文书如下:

 

 

澳门金沙国际 26

 

 

 

 

 

开采后,修改代码如下(去掉验证):

 

 

澳门金沙国际 27

 

 

 

 

(陆)再一次运转main方法,提交作业

 

 

再现十分:

 

 

org.apache.hadoop.security.AccessControlException:

Permission denied: user=Administrator, access=WRITE,inode="/":root:supergroup:drwxr-xr-x

 

 

 

     
 意思是再说,笔者当下使用的user是Windows下的Administrator,不过在Hadoop的HDFS文件系统中,未有那几个用户,由此,我想用Administrator这些用户向HDFS文件系统Write的时候现身权力不足的不行,因为HDFS文件系统根目录下的文书对其他用户来说,不有所w和r的权能

     
 原本把mapreduce程序打包放在集群中跑是绝不牵记用户的hdfs权限难题的,然则,我一先导说了,笔者不想那么勤奋,无非正是Hadoop开启了HDFS文件系统的权杖验证效能,小编给它关了(开放)不就行了,因而,小编主宰间接在hdfs-site.xml配置文件里开始展览权力验证的修改,增多内容如下:

 

<property>
    <name>dfs.permissions</name>
    <value>false</value>
</property>

 

 

 

澳门金沙国际 28

 

 

保存后,重启Hadoop集群

 

先stop,再start

 

 

澳门金沙国际 29

 

 

 

 

(七)再度运营main方法,提交作业

 

 

16:55:44.385 [main] INFO org.apache.hadoop.mapreduce.Job -  map 100% reduce 100%
16:55:44.385 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getTaskCompletionEvents(Job.java:670)
16:55:44.385 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getTaskCompletionEvents(Job.java:670)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.387 [main] INFO org.apache.hadoop.mapreduce.Job - Job job_local539916280_0001 completed successfully
16:55:44.388 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:758)
16:55:44.407 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 35
 File System Counters
  FILE: Number of bytes read=560
  FILE: Number of bytes written=573902
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=46
  HDFS: Number of bytes written=24
  HDFS: Number of read operations=13
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=4
 Map-Reduce Framework
  Map input records=3
  Map output records=12
  Map output bytes=72
  Map output materialized bytes=102
  Input split bytes=112
  Combine input records=0
  Combine output records=0
  Reduce input groups=6
  Reduce shuffle bytes=102
  Reduce input records=12
  Reduce output records=6
  Spilled Records=24
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=3
  Total committed heap usage (bytes)=605028352
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=23
 File Output Format Counters 
  Bytes Written=24
16:55:44.407 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
Job作业执行成功!

 

 

 

ok,至此,当地试行mapreduce作业已经到位,接下去便是查看大家要的结果了

 

 

 

 

(8)利用Java HDFS
API,张开/output/part-r-00000文书内容,输出到调整台

 

 

 

澳门金沙国际 30

 

 

 

a b c d
e f a c
a c b f

 

 

 

澳门金沙国际 31

 

 

 

 

 

 

7、GitHub项目地址

 

 

 

Java HDFS API ,达成公文的蕴藏和走访
并顺便MapReduce作业,本地提交作业至集群达成Word Count的计算

 

 

澳门金沙国际 32

相关文章