ps:文章最后谈及BulkLoad实践过程中遇到的问题。
下面代码的功能启动一个mapreduce过程,将hdfs中的文件转化为符合指定table的分区的HFile,并调用LoadIncrementalHFiles将它导入到HBase已有的表中
public static class ToHFileMapper
extends Mapper<Object, Text, ImmutableBytesWritable, KeyValue>{
Random random = new Random();
ImmutableBytesWritable oKey = new ImmutableBytesWritable();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
KeyValueBuilder builder = new FileMetaBuilder();
Iterator<KeyValue> keyValues = builder.getKeyValueFromRow(value.toString());
oKey.set(builder.getKey(value.toString()));
while(keyValues.hasNext()) {
KeyValue tmp = keyValues.next();
context.write(oKey, tmp);
}
}
}
public static void run(String fileMetaPath, String table) throws Exception{
String tmpPath = fileMetaPath.trim() + "_" + System.currentTimeMillis();
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", ToHFile.zkQuorum);
Job job = new Job(conf);
job.setJobName(ToHFile.class.getName());
job.setJarByClass(ToHFile.class);
job.setMapperClass(ToHFileMapper.class);
//关键步骤
HFileOutputFormat.configureIncrementalLoad(job, new HTable(conf, table));
FileInputFormat.addInputPath(job, new Path(fileMetaPath));
FileOutputFormat.setOutputPath(job, new Path(tmpPath));
job.waitForCompletion(true) ;
conf.set("fs.default.name",ToHFile.hdfsV1Name);
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
load.run(new String[]{tmpPath,table});
FileSystem hdfs = FileSystem.get(conf);
hdfs.delete(new Path(tmpPath),true);
}
细节描述:
-
ToHFileMapper是一个Map过程,它读取一个HDFS文件,并输出key=ImmutableBytesWritable,Value=KeyValue类型的kv数据. KeyValue类型为HBase中最小数据单位,即为一个cell,它由rowKey,family,qualifiers,timestamp,value大小,value值组成,参考下列的可视化输出:
K: 59129_3471712620_1374007953/f:status/1413288274401/Put/vlen=1/ts=0 V: 0
我们都知道HBase中数据是按照KV的格式进行组织和存储,在HBase层面它的key是rowKey,但是HFile层面,这里的key不仅仅是rowKey,参考上面的输出中K, 它由rowKey/family:qualifier/timestamp/类型/vlen=Value的大小/ts组成. 而Value就为对应的值.
我们可以通过KeyValue的API进行设置其中的每个字段的值,从而输出一条cell.注意mysql中一条记录中的每个字段对应HBase中一个cell,所以一条记录会输出多个cell. -
ToHFileMapper输出的Key的类型ImmutableBytesWritable,我们必须设置它的值为该cell的rowKey, 具体原因呢:
我们知道HBase中数据按照rowKey进行划分为多个region,每个region维护一组HFile文件,因此region之间的数据是严格有序,单个region中单个HFile的内部cell也是严格有序, 但单个region中多个HFile之间不要求有序.
这种有序性的要求也是为什么我们可以把一个HFile直接加载到HBase中的原因.对于原始数据,在map阶段将key设置为rowKey,采用特殊的分区的函数, 从而可以实现将属于同一个region的数据发送到同一个reduce,在reduce里面我们按照cell的有序,写入单个HFile中,这样我们就保证了region之间的有序,单个HFile有序性. -
上面我们谈到了根据key=rowKey进行分区,将属于同一个region的数据发送到同一个reduce中进行处理.但是在我们job的配置过程中,我们没有配置reduce,没有配置分区函数 而是通过调用HFileOutputFormat的configureIncrementalLoad函数进行操作,该函数接受一个HBase的Table对象,利于该Table的性质设置job相应的属性;参考下面的源码
public static void configureIncrementalLoad(Job job, HTable table) throws IOException { Configuration conf = job.getConfiguration(); //return org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner Class<? extends Partitioner> topClass = getTotalOrderPartitionerClass(); job.setPartitionerClass(topClass);//设置分区函数 job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class);//设置OutPut //设置reduce函数 if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } }
configureIncrementalLoad对Job的分区函数,reducer,output进行设置,因此对原始row数据转换为HFile,仅仅需要配置一个Map就可以了.其中reducer的实现也很简单,代码如下:
protected void reduce(ImmutableBytesWritable row, Iterable<KeyValue> kvs,Context context) throws IOException, InterruptedException { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); for (KeyValue kv: kvs) { map.add(kv.clone()); } int index = 0; for (KeyValue kv: map) { context.write(row, kv); } }
内部维护TreeSet,保证单HFile内部的cell之间有序,进而将他们输出到HFile中.
-
HFile结果输出.上述我们描述了Table,Region,HFile之间关系,其中我们没有对family进行考虑,在每个Region中,Family为管理的最大单位,它为每个rowKey的每个Family 维护一个单独的store(menstore+HFile组成).因此HFile的输出也是按照Family+region进行分开组织的.具体的结构这里就不描述了.
-
输出HFile的目录可以直接作为LoadIncrementalHFiles的参数,再加上一个table参数,就可以直接将目录下的HFile"move"到HBase特定目录下面.代码如下:
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); load.run(new String[]{tmpPath,table});
本来打算详细写一下LoadIncrementalHFiles的实现,但是通读了一下这块的实现,其实很简单的,首先确认每个HFile该放到哪个region里 (代码实现允许单个大HFile跨多个region,内部会自动对文件进行切割到两个region里), 然后连接每个HFile所对应的regionServer,做server端的文件Move操作.
具体就不写了.
一切就这么简单,就可以大吞吐的将数据导入到HBase中,大幅度的减少HDFS的IO压力.
###运行过程中遇到的关于reduce提前启动的问题
在Hadoop中,mapred.reduce.slowstart.completed.maps默认配置为5%,即在Mapper运行到5%就提前启动reducer过程,之所以这样的设计的主要优点是可以提前启动 reducer的shuffle过程,从而并行提高reduce执行效率。
但是在bulk load过程因为这个而导致性能很差,主要的原因我们hbase启动了预分区为1000,reduce的数目很多,如果预启动reducer,就会出现reducer与mapper进行资源 竞争的情况,从而拖累了整个job的执行。
当然主要的原因是我们hadoop很穷。。。
###基于KeyValue的错误实践
上面的实现方式中,map的输出是基于<ImmutableBytesWritable, KeyValue>,因此每条记录都会有多条map io输出操作,这是一个很严重的性能问题。会导致 shuffle操作的负载很高,所以上面的实践是一个错误。目前我使用的HBase版本为0.94,在configureIncrementalLoad中,我们看到有这样一行代码:
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
}
即HBase本身会针对map的value输出类型不同,而使用不同reduce,而我就傻啦吧唧了使用了第一种KeyValueSortReducer.class。
KeyValueSortReducer.class和PutSortReducer.class的区别是一个put可以由多个rowkey相同的keyvalue组成,可以很大程度上减少map输出。 此时的逻辑如下
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
KeyValueBuilder builder = new FileMetaBuilder();
Put put = builder.getPutFromRow(value.toString());
if(put == null){
return;
}
oKey.set(put.getRow());
context.write(oKey,put);
}
//并配置job的map的key和value类型
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);