mapreduce传递全局变量(MapReduce之Shuffle自定义对象排序已经Combiner)

1. Shuffle:

MapReduce的计算模型主要分为三个阶段,Map, shuffle, Reduce。 Map负责数据的过滤,将文件中的数据转化为键值对,Reduce负责合并将具有相同的键的值进行处理合并然后输出到HDFS。 为了让Reduce可以并行处理map的结果,必须对Map的输出进行一定的排序和分割,然后交个Reduce,这个过程就是Shuffle。

官方给的图如下:

mapreduce传递全局变量(MapReduce之Shuffle自定义对象排序已经Combiner)(1)

在这里插入图片描述

上图Map和Reduce之间的就是shuffle,但是猛地一看就是云里雾里的,倒不如下面这个图清楚:

mapreduce传递全局变量(MapReduce之Shuffle自定义对象排序已经Combiner)(2)

在这里插入图片描述

Map端的shuffle简单来说就是对map的结果进行分区缓存,当缓存不够的时候进行溢写,在溢写的过程中,排序写入到文件,每一次溢写是一个文件,最后将这些文件合并成一个文件。 分区排序的意思是相同partition的键值对存储在一起,partition之间是有序有的,每一个partition中的键值对也是有序的,默认是升序。

(1) 缓冲区

Map的输出结果不是直接写到文件的,是先写到缓存区中,缓存区是一个环形结构,是用环形缓存区的目的是尽可能高效的利用内存空间,默认大小是100M,可以通过参数调整缓冲区的大小。如下图:

mapreduce传递全局变量(MapReduce之Shuffle自定义对象排序已经Combiner)(3)

在这里插入图片描述

这个缓冲区其实是一个字节数组叫做kvbuffer,kvbuffer不只有数据键值对,还有数据的索引叫做kvmeta。 1byte[] kvbuffer; // main output buffer 2private static final int VALSTART = 0; // val offset in acct 3private static final int KEYSTART = 1; // key offset in acct 4private static final int PARTITION = 2; // partition offset in acct 5private static final int VALLEN = 3; // length of value

索引和数据的放在不同的两个区域,用一个分界点来划分,这个分界点不是一层不变的,会随着每次的溢写而改变,初始的位置为0,数据向上增长,索引向下增长。上图中的buindex是数据的位置索引,一直向上增长,比如初始值为0,写入一个int的key之后变为4,再写入一个int的Value之后变为8.

索引的区记录的是数据键值对的位置,是一个四元组占用4个Int长度:

value的起始位置,key的起始位置,partition的值以及Value的长度。 索引的写入是kvindex每次向下跳4个字节,然后再向上填充数据,比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex 0)的位置存放value的起始位置、(Kvindex 1)的位置存放key的起始位置、(Kvindex 2)的位置存放partition的值、(Kvindex 3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。

kvbuffer 的默认大小为100M,当然可以自己设置:

1public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb"; 2final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);

当缓存区达到一定的比例之后,一个后台线程开始把缓存区的数据写入到磁盘,这个写入的过程叫做Spill,即溢写。开始Spill的比例默认是0.8,这个比列可以通过mapreduce.map.sort.spill.percent配置,在后台溢写的同时,map继续向这个剩余的缓存中继续写入数据,写入数据的起始位置是剩余空间的中间,分别向两边写入索引和数据,如果缓存区满了溢写还没与完成的话,map会阻塞直到Spill完成。

spill的比列默认是0.8 也是可以设置的:

1public static final String MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent"; 2final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);

分区是在写入缓存的时候完成的,看了很多博客说是在溢写的时候进行的分区,感觉不是很对。想想也能明白,既然索引中要写入数据了,实在是没必要溢写的时候补上,并且缓存总放到都是byte数组,来回转换不也是麻烦。我看了看源码确实是在写入缓存的时候进行的分区:

1@Override 2public void collect(K key, V value) throws IOException { 3 try { 4 collector.collect(key, value, partitioner.getPartition(key, value, numPartitions)); 5 } catch (InterruptedException ie) { 6 Thread.currentThread().interrupt(); 7 throw new IOException("interrupt exception", ie); 8 } 9}

其中 partitioner是自定义分区或者默认分区,默认分区的会就一个为0,后一个numPartitions 实际上为MapReduceTask的个数:

1NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, 2 JobConf job, 3 TaskUmbilicalProtocol umbilical, 4 TaskReporter reporter 5 ) throws IOException, ClassNotFoundException { 6 collector = createSortingCollector(job, reporter); 7 partitions = jobContext.getNumReduceTasks(); 8 if (partitions > 1) { 9 partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) 10 ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); 11 } else { 12 partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { 13 @Override 14 public int getPartition(K key, V value, int numPartitions) { 15 return partitions - 1; //默认情况下 numPartitions 为1 所以返回的是0 16 } 17 }; 18 } 19} 20其中getNumReduceTasks()为: 21public static final String NUM_REDUCES = "mapreduce.job.reduces"; 22public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }

很多博客说分区的数量决定了reduceTak的数量,但是看源码知道不是这么回事,ReduceTask的数量是由自己定义的,默认是1,只有一个ReduTask,如果定了多个task,但是自定义分区的返回值超过了task的数量,则会抛异常:

1if (partition < 0 || partition >= partitions) { 2 throw new IOException("Illegal partition for " key " (" 3 partition ")"); 4}

定义 Reducetask的数量有两种方法 一个是在main法中的job定义

1job.setNumReduceTasks(2);

还有就是配置文件,配置文件是指mapper-default.xml

1name的值为mapreduce.job.reduces

如果定义了ReduceTask的个数,却没有之定义分区的话,默认使用的hash 取模的算法:

1public static final String PARTITIONER_CLASS_ATTR = "mapreduce.job.partitioner.class"; 2public Class<? extends Partitioner<?,?>> getPartitionerClass() 3 throws ClassNotFoundException { 4 return (Class<? extends Partitioner<?,?>>) 5 conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); 6} 7------------ 8public class HashPartitioner<K, V> extends Partitioner<K, V> { 9 10 /** Use {@link Object#hashCode()} to partition. */ 11 public int getPartition(K key, V value, 12 int numReduceTasks) { 13 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 14 } 15 16}

(2) Spill: 溢写

前面已经说了当环形缓存区的达到一定的时候进行溢写。溢写由单独的线程完成,不耽误mapTask的执行:

1final Condition spillReady = spillLock.newCondition(); 2private void startSpill() { 3 assert !spillInProgress; 4 kvend = (kvindex NMETA) % kvmeta.capacity(); 5 bufend = bufmark; 6 spillInProgress = true; 7 spillReady.signal(); 8}

上面代码中spillRead.signal()的意思是唤醒线程,Condition 是语言级别的等待唤醒机制,与object中的wait/notify意思相同,但是更可控。

唤醒Spill线程之后,首先执行的是排序。 排序的规则是是按照缓存中的数据的partition和key进行排序,移动的只是索引数据,排序的结果是先按照分区,分区相同的按照key排序,key的排序规则我们可以自定义。排序过程中使用的算法是快排,当然如果不想用快排我们也可以定义自己的排序规则,排序完之后就是Spill。 在Spill 之前如果设置了Combiner,则先调用Combiner,源码如下:

1sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); 2for (int i = 0; i < partitions; i) { 3 IFile.Writer<K, V> writer = null; 4 if (combinerRunner == null) { 5 // spill directly 6 DataInputBuffer key = new DataInputBuffer(); 7 while (spindex < mend && 8 kvmeta.get(offsetFor(spindex % maxRec) PARTITION) == i) { 9 final int kvoff = offsetFor(spindex % maxRec); 10 int keystart = kvmeta.get(kvoff KEYSTART); 11 int valstart = kvmeta.get(kvoff VALSTART); 12 key.reset(kvbuffer, keystart, valstart - keystart); 13 getVBytesForOffset(kvoff, value); 14 writer.append(key, value); 15 spindex; 16 } 17 } else { 当combiner不为空 18 int spstart = spindex; 19 while (spindex < mend && 20 kvmeta.get(offsetFor(spindex % maxRec) 21 PARTITION) == i) { 22 spindex; 23 } 24 // Note: we would like to avoid the combiner if we've fewer 25 // than some threshold of records for a partition 26 if (spstart != spindex) { 27 combineCollector.setWriter(writer); 28 RawKeyValueIterator kvIter = 29 new MRResultIterator(spstart, spindex); 30 combinerRunner.combine(kvIter, combineCollector); 31 } 32 } 33}

上面只是截取了一部分。 Combiner本质上是一个Reduce,对结果进行预处理,先在map端对结果进行一次合并,以减少map 和reduce之间的数据的传输量,提高网络IO性能,是Mapreduce的一种优化手段,但并不是所有的mapReduce的结果都适合设置Combiner,设置Combiner的原则是在不改变Reduce最终结果的前提下。比如说网上说的求和或者最大值可以设置Combiner,但是求平均值就不行,原因也很简单,因为在map端进行了一次和并,如果求平均值的话,map端先求了一次平均值,到Reduce端的值就是每一个map端求平均值之后的平均值了,那么怎么可能最终的平均值不受影响呢!

开始溢写的时候先创建溢写文件,文件名字类似spill1.out,有个变量记录溢写的次数,文件名每次溢写累加1,溢写的过程中以此按照kvmeta中的partition一次写到一个文件中,但是在文件中怎么知道每一个partition呢,MapReduce中利用了索引,索引中记录了每个partition的位置,长度还有压缩之后的长度,刚开始是记录在内存中的,当达到了一定的内存(默认为1M),则写入index文件:

1private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024; 2public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes"; 3indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,INDEX_CACHE_MEMORY_LIMIT_DEFAULT); 4 5//达到最大值 6if (totalIndexCacheMemory >= indexCacheMemoryLimit) { 7 // create spill index file 8 Path indexFilename = 9 mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions 10 * MAP_OUTPUT_INDEX_RECORD_LENGTH); 11 spillRec.writeToFile(indexFilename, job); 12} else { 13 indexCacheList.add(spillRec); 14 totalIndexCacheMemory = 15 spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; 16}

文件的名字是spillxx.out.index,所以每次溢写至少有一个index文件和一个out文件:

1@Override 2public Path getSpillIndexFileForWrite(int spillNumber, long size) 3 throws IOException { 4 return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT "/spill" 5 spillNumber ".out.index", size, getConf()); 6}

map任务每次的执行的最终结果都要写到磁盘上,哪怕最后的结果不足与超过上面的0.8,因为后续的Reduce需要拉去数据进行Reduce任务。

当多次spill之后,会产生多个溢写文件,当map任务执行完之后需要合并当前maptask所产生的溢写文件:

merge的方法家叫做mergeParts() 在MapTask文件中,merge首先读取本地所有的的index文件,将index文件加载到内存中,按照partition一次读写所有index文件这个partition的索引信息,每一个partition的索引信息分装成一个Segment,然后对这个partition的所有Segment进行合并,最终合并成一个Segment。具体的merge方法是Merge.java中有个静态类叫做MergeQueue,其中有一个方法叫做的merge方法。规则是分批对Segment进行合并,就是先取出第一批进行合并,,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。最终的索引数据仍然输出到Index文件中。最终输出的文件叫做file.out,index文件叫做file.out.index文件。

(3)ReduceShuffle

ReduceShuffle 首先拉取Map端输出的数据,可能会首先copy到内存在内存中合并,也可能是直接copy到硬盘,视情况而定,判定情况如下:

1private boolean canShuffleToMemory(long requestedSize) { 2 return (requestedSize < maxSingleShuffleLimit); 3}

首先说内存中的合并,Reduce要向每一个map拉取数据放到内存,当内存占到一定的比列的时候,开始merge数据,merge完之后把数据写到磁盘,如果设置了Combiner的话,name先调用combine,源码可以看下MergeManagerImpl类 中的startMerge这个方法,当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓冲区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块, 之后就是调用reduce了。

2. 序列化 :

如果我们我们要定义自己的数据类型的那话,那就需要我们自己实现了,MapReduce中数据的传输都要实现Writeable。很简单,如下

1public class Student implements Writable { 2 private String name; 3 private int age; 4 public void write(DataOutput out) throws IOException { 5 out.writeUTF(name); 6 out.writeInt(age); 7 8 } 9 public void readFields(DataInput in) throws IOException { 10 11 this.name = in.readUTF(); 12 this.age = in.readInt(); 13 }

主要是实现write和readFields这两个方法,一个写一个读。 但是这样是不够的,这样的话自定义的这个对象只能作为value来传递,如果作为key的话,那肯定不行的,这是因为看源码:

1public RawComparator getOutputKeyComparator() { 2 Class<? extends RawComparator> theClass = getClass( 3 JobContext.KEY_COMPARATOR, null, RawComparator.class); 4 if (theClass != null) 5 return ReflectionUtils.newInstance(theClass, this); 6 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); 7} 8public <U> Class<? extends U> asSubclass(Class<U> clazz) { 9 if (clazz.isAssignableFrom(this)) 10 return (Class<? extends U>) this; 11 else 12 throw new ClassCastException(this.toString()); 13}

看 上面的代码,如果没有实现WritableComparable这个类的话,就会爆ClassCastException的异常了,因为后面的排序需要用,所以必须实现。不过看上面的代码, if (theClass != null) 这个,如果这个不为空的话也是可以的,而这个类就是我们在main方法里面设置的自定义排序的排序:

1job.setSortComparatorClass();

那么这个WritableComparable是什么呢?

1@InterfaceAudience.Public 2@InterfaceStability.Stable 3public interface WritableComparable<T> extends Writable, Comparable<T> { 4}

看上面的源码,他实现了Writable,所以我们如果想让自定义的对象作为key,那么实现这个接口就可以了,如下:

1public class Student implements WritableComparable<Student> { 2 private String name; 3 private int age; 4 public void write(DataOutput out) throws IOException { 5 out.writeUTF(name); 6 out.writeInt(age); 7 } 8 public void readFields(DataInput in) throws IOException { 9 this.name = in.readUTF(); 10 this.age = in.readInt(); 11 } 12 public int compareTo(Student o) { 13 return age - o.age; 14 } 15}

3. 排序:

在MapReduce的输出中都会默认排序,排序规则是按照字典顺序。 当然我们可以自定排序规则,上面的自定义对象已经说了。

如果是基本类型的排序呢? 比如说 IntWritable 这个类,他的默认排序规则是升序。我如果想让他降序呢?其实也很简单,每一个基本类型中都有这么一个静态类:

1public static class Comparator extends WritableComparator { 2 public Comparator() { 3 super(IntWritable.class); 4 } 5 6 @Override 7 public int compare(byte[] b1, int s1, int l1, 8 byte[] b2, int s2, int l2) { 9 int thisValue = readInt(b1, s1); 10 int thatValue = readInt(b2, s2); 11 return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); 12 } 13} 14 15static { // register this comparator 16 WritableComparator.define(IntWritable.class, new Comparator()); 17}

所以我们只要继承这个类,并比较的时候返回数取反即可:

1public class MyCompare extends IntWritable.Comparator { 2 3 @Override 4 public int compare(Object a, Object b) { 5 return - super.compare(a, b); 6 } 7}

当然你需要在main方法中的job上设置自定义的比较器:

1job.setSortComparatorClass(MyCompare.class);

3. 分区:

分区的目的是将不同的内容放到不同的文件中,也可以加快处理速度,毕竟每一个分区对应不同的Reduce,但是需要注意的是返回的分区数不能大于设置的reduceTask的个数,上面的shuffle有提到原因:

1public class MyPartitioner extends Partitioner<Text, IntWritable> { 2 3 public int getPartition(Text text, IntWritable intWritable, int numPartitions) { 4 5 int hashcode = text.hashCode(); 6 System.out.println("text = " text " ; hascode " hashcode " ; model = " hashcode % 3); 7 8 return hashcode % 3; 9 } 10} 11 12job.setPartitionerClass(MyPartitioner.class);

最后一个参数numPartitions 表示的是最大分区个数,也就是reduceTask的值,所以不要超过它。

  1. Combiner
  2. Combiner虽然本质上是一个reduce,但是没有默认的实现,需要自己定义并且在job中设置才可以。Combiner的作用是先做一次本地的合并,减少网络之间的传输量。但是并不是所有的输出都适合使用Combiner,只有那些不会改变最终结果的才适合使用。
  3. 使用它其实很简单,直接继承Reduce即可。

参考文档:

https://blog.csdn.net/bingduanlbd/article/details/51933914

https://blog.csdn.net/u014374284/article/details/49205885

欢迎关注我的公众号: 北风中独行的蜗牛。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页