Hadoop.MapReduce.简介

本文是2009年9月为公司内部培训写得的一篇简介。

MapReduce概述

提供计算任务的自动并行化机制,使用分发-收集的并行策略,Map阶段处理(无依赖的)原始输入,Reduce阶段处理依赖关系(按Key依赖)。

架构在hadoop之上,原则上可以使用hadoop代理的所有分布式文件系统(hdfs,kfs,s3),但我们目前仅使用hdfs。

MapReduce流程

1.        客户端提交MapReduce任务

2.        JobTracker分发任务,依据输入文件的chunk位置信息,将相应的split分发到TaskTracker

3.        Map.TaskTracker 执行Map任务

a)        读取split

b)        产生输出,输出按i= Hash(K) % R分发至Reduce[i]

 

4.        Reduce.TaskTracker执行Reduce任务

a)        Reduce.shuffle可与Map任务并行执行

b)        甚至sort也可以和Map并行执行

c)        但用户的reduce过程不能用Map并行执行

d)        产生R个最终输出文件,整个MapReduce过程结束

 

MR矩阵与容错

NameNode处理任务调度与错误恢复,因此,在NameNode上,最基本的一个数据结构就是MR[M,R]矩阵。每个Map进程一行,每个Reduce进程一列。

 

 

R1

R2

R3

 

Rj

 

 

 

Rr

M1

 

 

 

 

 

 

 

 

 

M2

 

 

 

 

 

 

 

 

 

M..

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Mi

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Mm

 

 

 

 

 

 

 

 

 

 

每个Map的输出被分成R份,按hash规则,每个Reduce一份,这样当任何一个/多个Reduce任务失败时,重启的Reduce只需要从每个Map的输出读取自己的这一份(绿色列)。

当任何一个/多个Map任务失败——这个很简单,随后重启成功,每个Reduce进程只需要读取自己相应于该Map任务的那些数据(粉色行)。

Map

切分输入

如何切分

大文件(大于1 chunk)

小文件(小于1 chunk)

压缩

不切分

不切分

未压缩

按chunk切分

 

对于大文件,按chunk进行切分,切分程序需要处理chunk边界情况,例如,对于普通文本文件,每行一个记录,chunk边界通常在一行中间,切分程序必须处理这种情况,把跨chunk的记录归入前一个split,因此:

l  需要在网络上传输半条记录

l  并不是每个split的尺寸都精确地等于1chunk

对于定长记录文件,要简单一些,也可以安装这种方式来处理。

 

压缩文件不能进行切分,因为切分后找不到同步点(压缩头)。所以,把压缩文件的尺寸控制在1 chunk内,一方面可以提高Map的并行度,另一方面也可以减少网络传输,因为超出1 chunk的就不在第一个chunk所在的data server了。

 

解析(parse)记录

将输入的字节流parse成一条条记录

 

调用Map.map

调用用户定义的map方法

在Map.map中,除了处理、变换数据,一般还需要调用report,向框架报告进度(执行情况)。

 

Reduce

Reduce.shuffle

可与Map任务并行执行

Reduce.sort

也可以和Map并行执行,然而,一旦有Map任务失败重启,排序结果就作废了!

Reduce.reduce

调用用户定义的reduce函数。这个过程不能用Map并行执行,因为reduce函数需要接受每个Key对应的整个value集合(这个集合可能也是有序的——SecondarySort)。作为一个极端情况,最后完成的那个Map可能包含所有Key,并且value也是最小的!

这个过程,也可能需要向框架报告进度。

这一步将产生最终输出文件,每个Reduce进程一个,整个MapReduce过程结束。因此,MapReduce的输出总是/path/to/reuce/output/part-####,而不是一个单一的文件。这些输出有可能作为后续其它MapReduce过程的输入。

 

Hadoop.MapReduce接口

原生接口

旧接口(0.20以前)

@Deprecated

public interface Mapper<K1, V1, K2, V2>extends
JobConfigurable, Closeable{

  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporterreporter)

  throws IOException;

}

 

@Deprecated

public interface Reducer<K2, V2, K3,V3>
extends JobConfigurable, Closeable {

 

  void reduce(K2 key, Iterator<V2> values,

              OutputCollector<K3, V3>output, Reporter reporter)

    throws IOException;

}

新接口(0.20以后)

public classMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

 

 public abstract class Context implementsMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

 {

  }

  protected void setup(Context context) throws IOException,InterruptedException {

    // NOTHING

  }

  @SuppressWarnings("unchecked")

  protected void map(KEYIN key, VALUEIN value,Context context)

 throws IOException, InterruptedException {

    context.write((KEYOUT) key, (VALUEOUT)value);

  }

  protected void cleanup(Context context) throws IOException, InterruptedException {

    // NOTHING

  }

  public void run(Context context) throws IOException, InterruptedException {

    setup(context);

    while (context.nextKeyValue()) {

      map(context.getCurrentKey(),context.getCurrentValue(), context);

    }

    cleanup(context);

  }

}

 

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{

public abstract class Context implements ReuceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

{

}

 protected void setup(Context context) throws IOException, InterruptedException

{

    // NOTHING

}

  @SuppressWarnings("unchecked")

protected void reduce(KEYINkey, Iterable<VALUEIN> values, Context context)

 throws IOException, InterruptedException {

    for(VALUEIN value: values) {

      context.write((KEYOUT) key, (VALUEOUT)value);

    }

  }

 protected void cleanup(Context context) throws IOException, InterruptedException {

    // NOTHING

  }

  @SuppressWarnings("unchecked")

 public void run(Context context) throws IOException, InterruptedException {

    setup(context);

    while (context.nextKey()) {

      reduce(context.getCurrentKey(),context.getValues(), context);

      // If a back up store is used, reset it

     ((ReduceContext.ValueIterator)(context.getValues().iterator())).resetBackupStore();

    }

    cleanup(context);

  }

}

 

新版更灵活,另一方面,默认的map和reduce都是identity,而以前版本的identity是用专门的类来表达的。

灵活性表现在context参数上,在重构中,这个叫参数提取(Introduce Parameter Object)。如果以后参数需要改变,或者需要插入新的方法,就只需要修改Parameter Object,而不需要修改接口本身。

同时,又为旧的接口提供Adapter/Bridge,以便兼容(二进制兼容+源码兼容)旧程序。

 

示例(WordCount)

Map

  public static class MapClass extends MapReduceBase

    implements Mapper<LongWritable, Text,Text, IntWritable> {

   

    private final static IntWritable one =
new IntWritable(1);

    private Text word = new Text();

   

    public void map(LongWritable key, Text value,

                   OutputCollector<Text, IntWritable> output,

                    Reporterreporter) throws IOException {

      String line =value.toString();

      StringTokenizer itr = newStringTokenizer(line);

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        output.collect(word, one);

      }

    }

  }

Reduce

  public static class Reduce extends MapReduceBase

    implements Reducer<Text, IntWritable,Text, IntWritable> {

   

    public void reduce(Text key, Iterator<IntWritable>values,

                      OutputCollector<Text, IntWritable> output,

                       Reporterreporter) throws IOException {

      int sum = 0;

      while (values.hasNext()) {

        sum += values.next().get();

      }

      output.collect(key, new IntWritable(sum));

    }

  }

 

配置任务

  public int run(String[] args) throws Exception {

    JobConf conf = new JobConf(getConf(),WordCount.class);

    conf.setJobName("wordcount");

 

    // the keys are words (strings)

    conf.setOutputKeyClass(Text.class);

    // the values are counts (ints)

   conf.setOutputValueClass(IntWritable.class);

   

    conf.setMapperClass(MapClass.class);       

    conf.setCombinerClass(Reduce.class);

    conf.setReducerClass(Reduce.class);

   

    List<String> other_args = newArrayList<String>();

    for(int i=0; i < args.length; ++i) {

      try {

        if ("-m".equals(args[i])) {

         conf.setNumMapTasks(Integer.parseInt(args[++i]));

        } else if ("-r".equals(args[i])){

         conf.setNumReduceTasks(Integer.parseInt(args[++i]));

        } else {

          other_args.add(args[i]);

        }

      } catch (NumberFormatException except) {

        System.out.println("ERROR: Integer expected instead of " + args[i]);

        return printUsage();

      } catch (ArrayIndexOutOfBoundsException except) {

        System.out.println("ERROR: Required parameter missing from " +

                          args[i-1]);

        return printUsage();

      }

    }

    // Make sure there are exactly 2 parameters left.

    if (other_args.size() != 2) {

      System.out.println("ERROR: Wrong number of parameters: " +

                         other_args.size() + " instead of 2.");

      return printUsage();

    }

    FileInputFormat.setInputPaths(conf,other_args.get(0));

    FileOutputFormat.setOutputPath(conf,new Path(other_args.get(1)));

       

    JobClient.runJob(conf);

    return 0;

  }

 

Streaming

文本协议的接口,记录以换行符分隔,Key-Value以\t分隔。比原生接口更简单,同时,还可以进行更方便的本地测试——通过管道进行。

原则上,Streaming接口只需要遵循一条:Map.OutputKey==Reduce.InputKey

最简单的一个程序:大客户每天独立用户数计算。MapReduce的输出结果是每个用户和他对应的浏览量,再使用wc -l就可以得出总量,并且,可以从原始输出取得每个用户的浏览量。

Map(为简单起见,省略了错误处理)

#include <stdio.h>

#include <febird/trb_cxx.h>

 

int main(int argc, char* argv[])

{

   size_t len = 0;

char*  line = NULL;

using febird::trbstrmap;

   trbstrmap<int> smap;

   for (;;) {

       ssize_t len2 = getline(&line,&len, stdin);

       if (-1 == len2) break;

       char* ck = strstr(line,"&xnbc=");

       char* end = strstr(ck+6,"HTTP");

       end[-4] = 0;

       smap[ck+6]++;

   }

    for(trbstrmap<int>::iterator iter = smap.begin();iter; ++iter)

        printf("%s\t%d\n",smap.key(iter), *iter);

if (line) free(line);

return 0;

}

 

 

这个Map的逻辑比WordCount.Map要稍微复杂一点,因为在程序中,相当于已经做了Combine,这个Combine比MapReduce本身的Combine要高效得多。

优化无止境,如果需要进一步优化,该程序可以在smap的尺寸达到一定大小时就进行输出,而不必等到处理完全部输入后再输出,这样一方面减小了内存占用,另一方面还提高了并发度——每输出一点数据框架就可以传输一点,进而shuffle、sort。

Reduce

框架传递给Reduce程序的[{key, value}]中,相等的Key总是相邻的,充分利用这一点,可以有效化简程序,并提高性能。然而要利用这一点,在编程上付出的努力就比原生接口要复杂一些,原生接口只需遍历Value集合,而使用Streaming需要自己判断相同Key的集合边界,还要处理一些其他边界问题(代码中黄色行)。

这个Reduce实际上可以作为一个通用的reduce,叫它sumlong,可以计算每个Key发生的次数,只要输入匹配key\tnum,用正则表达式就是:.*?\t[0-9]-?{1,20}

#include <stdio.h>

#include <string.h>

#include <stdlib.h>

 

int main(int argc, char* argv[])

{

   size_t llen = 0, klen = 0;

   char  *line = NULL, *key =(char*)malloc(1024);

   long cnt = 0;

   key[0] = 0;

   for (;;) {

        ssize_t len2 = getline(&line,&llen, stdin);

        if (-1 == len2) {

            if (klen) printf("%s\t%ld\n",key, cnt);

            break;

        }

        char* tab = (char*)memchr(line, '\t',len2);

        if (tab) {

            long cnt2 = atol(tab+1);

            size_t klen2 = tab - line;

            if (klen2 == klen &&memcmp(line, key, klen) == 0)

                cnt += cnt2;

            else {

                if (klen)printf("%s\t%ld\n", key, cnt);

                memcpy(key, line, klen2);

                key[klen2] = 0;

                klen = klen2;

                cnt = cnt2;

            }

        } // if (tab)

   }

   if (line) free(line);

   free(key);

   return 0;

}

 

调用脚本

#Distinct User Sum

if (($# < 1)); then

        echo usage: $0 yyyy_mm_dd[_hh]

        exit

elif (($# < 2)); then

        outFile=/data/leipeng/big/dus/$1

else

        outFile=$2

fi

 

hdir=/opt/hadoop

year=`echo $1 | awk-F"_" '{print $1;}'`

month=`echo $1 | awk-F"_" '{print $2;}'`

day=`echo $1 | awk -F"_"'{print $3;}'`

hour=`echo $1 | awk-F"_" '{print $4;}'`

 

if [[ -z $hour ]] ; then

        pvd=`/opt/hadoop/bin/hadoop fs –ls \
 /user/root/log_data/log_in/pv/$year/$month/$day/*/\
 | awk 'NF>=8{printf("-input %s", $8)}'`

else

        pvd="-input \
/user/root/log_data/log_in/pv/$year/$month/$day/$hour/pvval"

fi

 

echo $pvd

cd $hdir

bin/hadoop fs -rmr/test/dus/output/$1

bin/hadoop jar \

   contrib/streaming/hadoop-*-streaming.jar \

   -conf conf/dus.xml\

   $pvd \

   -output /test/dus/output/$1 \

   -mapper $hdir/aa_dumap \

   -reducer $hdir/aa_duadd

 

bin/hadoop fs -cat "/test/dus/output/$1/part*"| wc -l > $outFile

 

 

如果再加上配置文件conf/dus.xml,整个调用脚本比程序本身还要长。

 

本地测试命令,相当于仅一个Map和一个Reduce进程:

cat pvfiles | aa_dumap | aa_duadd [|wc –l]

 

如果要看所有用户的结果,而非最终统计,就不许要|wc–l。这个比原生接口的测试要简单得多!

使用其它语言

Streaming不光可以使用C/C++,任何语言都可以,比如awk,上面的程序用awk可以更简单——效率也许会低一点,但的确更简单得多得多!

awk.Map

cookie之前有&xnbc=标识,之后有|| HTTP,因此,用这两个串做字段分隔符,cookie内容正好是第二个字段$2。

程序本身不做Combine,让MapReduce框架去做,或者干脆不做。

awk '-F&xnbc=|\\|\\| HTTP''{printf("%s\t1\n", $2)}'

 

awk.Reduce

Reduce不需要自定义字段分隔符,默认的正好

简单的,用内存多点

awk '{km[$1]+=$2}END{for (k inkm){printf("%s\t%d\n", k, km[k]);}}'

复杂点,用内存小点

awk 'p==$1{c+=$2}p!=$1{if(p!="") printf("%s\t%d\n", p, c); p=$1; c=$2;}END{if(NR>0) printf("%s\t%d\n", p, c)}'

 

可读形式:

 

p==$1{c+=$2}

p!=$1{

    if(p!="")

       printf("%s\t%d\n",p, c);

    p=$1;c=$2;

}

END{

# 不能漏掉最后一条,并且,空的输入必须是空的输出

    if(NR>0)

       printf("%s\t%d\n",p, c)

}

 

调用脚本的复杂性还是一样的,不过,使用awk,可以直接把awk程序写在调用脚本中,就不需要任何其它程序了。

 

更多控制选项

每个任务可调的参数

Name

Type

Description

mapred.job.id

String

The job id

mapred.jar

String

job.jar location in job directory

job.local.dir

String

The job specific shared scratch space

mapred.tip.id

String

The task id

mapred.task.id

String

The task attempt id

mapred.task.is.map

boolean

Is this a map task

mapred.task.partition

int

The id of the task within the job

map.input.file

String

The filename that the map is reading from

map.input.start

long

The offset of the start of the map input split

map.input.length

long

The number of bytes in the map input split

mapred.work.output.dir

String

The task's temporary output directory

 

其它参数是全局选项,可参考hadoop官方文档(Hadoop安装目录内)。

 

原文链接: https://www.cnblogs.com/rockeet/archive/2011/10/18/3666890.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍

    Hadoop.MapReduce.简介

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/34637

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年2月8日 上午11:31
下一篇 2023年2月8日 上午11:32

相关推荐