第四章是介绍Hadoop的IO系统,前面介绍的是数据完整性的方案,无外乎就是校验和等机制,接着介绍的是hadoop中的压缩机制,这两块内容后续仔细阅读。
串行化:
Hadoop的进程间通信和持久化保存的时候都要用到串行化的技术。首先看一下Hadoop的进程间通信的机制。Hadoop中进程间通信时通过自己的一套RPC机制来实现的,节点间的通信格式由protocal来决定。
Hadoop中并没有使用Java的串行化机制,而使使用自己的一套技术。
所有可串行化类的都继承自Writable接口。hadoop提供了很多可串行化的类型,如IntWritable,Text,DoubleWritable等。每个类中都有一个内部类Comparator。在MapReduce过程中排序是很重要的操作,而排序的比较方法就是有这个内部类提供的。WritableComparator是Comparator内部类的基类,它提供了默认的比较方法的实现和提供一个工厂方法,可以根据不同的类型返回不同的WritableComparator实现类。
我们看一下WritableComparator的compare函数实现:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
return compare(key1, key2); // compare them
}
可见默认的compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)操作是先将流中的数据反序列化,然后调用compare(WritableComparable a, WritableComparable b)函数,compare(WritableComparable a, WritableComparable b)调用对象的compareTo函数进行比较。
再看一下IntWritable中的Comparator内部类,它复写了ompare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法:
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntWritable.class);
}
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int thisValue = readInt(b1, s1);
int thatValue = readInt(b2, s2);
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
}
可见compare并没有反序列化,而是直接以字节的形式进行比较,从而提高了比较操作的效率。
类型VIntWritable:
VIntWritable是变长的可序列化整形,他的原理是用第一个字节的第4位(从右向左数)来表示这个数的正负,0为负1为正(注意跟正常的编码不一样),用后三位来表示这个数占几个字节(算上第一个字节)。10001111 - 10001000,从右数第四位是1,则表示这个数是正的,后三位111-000分别表示2-9.;再看10000111 - 10000000,从右数第四位是0,表示负数,后三位111-000分别表示2-9个字节。
比如书上的例子163,00000000 00000000 00000000 10100011,算上第一个字节,二进制至少需要两个字节存放,所以序列化写入流的时候第一个字节为8f(10001111)——第四位为1,表示正数,前三位111表示2个字节。所以163在内存中用两个字节就可以存下:8fa3。具体操作可以参见源码。
类型Text:
Hadoop中的Text是UTF-8编码,这跟Java的String不一样(Java的String是unicode编码)。而且Text会用一个VIntWritable来记录整个Text有多少个字节(不算这个记录的VIntWritable的长度)。
类型ByteWritable:
ByteWritable在序列化的时候会用4个字节表示后面数据占几个字节,比如byte[]{2, 3}在byteWritable中存储形式是:000000020203。
实现自己的类型:
由于Writable是MapReduce过程中的核心类型,所以有的时候我们要实现自己的类型,下面用书中的例子来说明一下:
package com.test.hadoop3;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getSecond() {
return second;
}
public void setSecond(Text second) {
this.second = second;
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public int compareTo(TextPair o) {
int cmp = first.compareTo(o.first);
if (cmp != 0)
return cmp;
return second.compareTo(o.second);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TextPair) {
TextPair tp = (TextPair) obj;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public String toString() {
return first + "\t" + second;
}
}
我们实现的TextPair类型有两个Text组成,readFields和write是序列化相关,所以必须实现。hashCode方法在HashPartitioner做分区的时候会用到,而toString函数,在MapReduce利用TextInputFormat进行输出时,会调用key和value的toString进行输出,所以必须正确的是实现。TextPair实现了WritableComparable接口,所以必须实现compareTo方法。
对于我们刚刚实现的类型,我们并没有提供其他类型中含有的Comparator内部类,所以默认的比较行为是反序列化成对象,然后调用对象的compareTo方法进行比较。我们可以利用Comparator内部类对这个比较过程进行优化,优化的方法是在比较过程中不反序列化对象,而是直接以二进制进行比较。
private static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() {
super(TextPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
// 计算first有多少个字节,Text会有一个VIntWritable来记录first总共有多少个字节,所以长度是这个VIntWritable的长度+first总共有多少个字节
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
static {
WritableComparator.define(TextPair.class, new Comparator());
}
分享到:
相关推荐
Hadoop The Definitive Guide 2nd Edition.pdf
Hadoop The Definitive Guide 2nd Edition
Hadoop The Definitive Guide 2nd Edition-指南第二版 这个是Hadoop指南英文第二版,高清文字版本,可以复制文字内容。
hadoop指南第二版 hadoop指南2 Hadoop The Definitive Guide 2nd Edition
书名:Hadoop The Definitive Guide 语言:英文 The rest of this book is organized as follows. Chapter 2 provides an introduction to MapReduce. Chapter 3 looks at Hadoop filesystems, and in particular ...
Hadoop The Definitive Guide, 4th Edition.pdf(Hadoop权威指南第4版英文版O'REILLY)
The fourth edition covers Hadoop 2 exclusively. The Hadoop 2 release series is the current active release series and contains the most stable versions of Hadoop. There are new chapters covering YARN ...
Hadoop The Definitive Guide 3rd EditionHadoop The Definitive Guide 3rd Edition
Hadoop The Definitive Guide, 2nd Edition 第二版来了原版,下载吧
Hadoop The Definitive Guide, 4th Edition
1449311520 Hadoop The Definitive Guide 3rd Edition 第三版全,2012、5月
Hadoop The Definitive Guide 3rd Edition Storage and Analysis at Internet Scale By Tom White May 2012 Pages: 688
Hadoop The Definitive Guide (4th Edition)
OReilly.Hadoop.The.Definitive.Guide.June.2009.RETAiL.eBOOk-sUppLeX Description Apache Hadoop is ideal for organizations with a growing need to process massive application datasets. Hadoop: The ...