Hadoop key 排序问题(降序、升序、二次排序)

一、问题描述:

在日常用Map Reduce处理数据时,会涉及到对key进行排序。
默认时,在Map到Reduce之间的key排序方式都是按照字符串的字段序的。
有可能会有以下一些问题:
1.是否能按照字典序逆序排列?
2.是否能按照整型来升序排序?
3.是否能按照整型来降序排序?
4.是否能按照多个字段来排序?
5.是否能按照多个字段且字段类型也不一样来排序?

二、分析思路:

默认情况下定义的Map的数据输出时key的类型为Text,在到Reduce的各Partitions时,需要按照key的值来比较处理,默认按照字典序来处理。
如果key的类型为IntWriteable,则默认按照整型排序。
限于篇幅本文主要讨论key为Text类型的情况下,如何自定义其排序方式。

数据定义

NUM1
NUM2
NUM3
STR1
STR2
STR3

分析思路

自定义key排序方式,需要了解类RawComparator。
我们可以自定义一个比较排序类OwnTextKeyComparator。

/* Map */
public static class Map extends Mapper<Object, Text, Text, Text>{

	@Override
	public void map(Object key, Text value, Context context) throws IOException,
		InterruptedException{
		int num1 = 0;
		int num2 = 0;
		int num3 = 0;
		String str1 = "";
		String str2 = "";
		String str3 = "";

		// 获取各维度值

		//组织各场景下Map的key与value
		//context.write();
	}
}
/* OwnTextKeyComparator */
public static class OwnTextKeyComparator implements RawComparator<Text>{
	public int compare(Text o1, Text o2) {
		return 0;
	}

	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		// 此处为主要处理位置
		// return 0;
	}
}

使用时,可在任务配置时设置自定义比较方法:

job.setSortComparatorClass(OwnTextKeyComparator.class);

三、问题场景:

场景1

处理字段:字段NUM1
比较方式:整型方式降序比较
处理方式:
Map 中key组织方式:

context.write(new Text(num1 + ""), value);

OwnTextKeyComparator 中compare函数处理:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
	// 分别取得两个key的数据,并转换成整型数,再根据比较结果给出返回值
	byte[] _b1 = Arrays.copyOfRange(b1, s1 + 1, s1 + l1);
	byte[] _b2 = Arrays.copyOfRange(b2, s2 + 1, s2 + l2);
	String t1 = new String(_b1);
	String t2 = new String(_b2);
	int n1 = Integer.parseInt(t1);
	int n2 = Integer.parseInt(t2);

	int result = n1 - n2;
	if(result != 0){
		return -result;		//改成return result; 则为整型升序比较
	}else{
		return 0;
	}
}

场景2

处理字段:字段NUM1、NUM2
比较方式:先根据NUM1整型方式升序比较,再根据NUM2整型方式降序比较。
处理方式:
Map 中key组织方式:

context.write(new Text(num1 + "\t" + num2), value);

OwnTextKeyComparator 中compare函数处理:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
	// 分别取得两个key的数据,并转换成整型数,再根据比较结果给出返回值
	byte[] _b1 = Arrays.copyOfRange(b1, s1 + 1, s1 + l1);
	byte[] _b2 = Arrays.copyOfRange(b2, s2 + 1, s2 + l2);
	String[] t1 = new String(_b1).split("\t");
	String[] t2 = new String(_b2).split("\t");
	int nA1 = Integer.parseInt(t1[0]);
	int nA2 = Integer.parseInt(t1[1]);
	int nB1 = Integer.parseInt(t2[0]);
	int nB2 = Integer.parseInt(t2[1]);

	int compare1 = nA1 - nB1;
	int compare2 = nA2 - nB2;
	if(compare1 != 0){
		return compare1;
	}else{
		if(compare2 != 0){
			return -compare2;
		}else{
			return 0;
		}
	}
}

场景3

处理字段:字段STR1
比较方式:根据STR1字典序降序比较。
处理方式:
Map 中key组织方式:

context.write(new Text(str1), value);

OwnTextKeyComparator 中compare函数处理:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
	// 分别取得两个key的数据,根据比较结果给出返回值
	byte[] _b1 = Arrays.copyOfRange(b1, s1 + 1, s1 + l1);
	byte[] _b2 = Arrays.copyOfRange(b2, s2 + 1, s2 + l2);
	String t1 = new String(_b1);
	String t2 = new String(_b2);

	return -t1.compareTo(t2);
}

场景4

处理字段:字段STR1、STR2
比较方式:先根据STR1字典序升序比较,再根据STR2字典序降序比较。
处理方式:
Map 中key组织方式:

context.write(new Text(str1 + "\t" + str2), value);

OwnTextKeyComparator 中compare函数处理:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
	// 分别取得两个key的数据,根据比较结果给出返回值
	byte[] _b1 = Arrays.copyOfRange(b1, s1 + 1, s1 + l1);
	byte[] _b2 = Arrays.copyOfRange(b2, s2 + 1, s2 + l2);
	String[] t1 = new String(_b1).split("\t");
	String[] t2 = new String(_b2).split("\t");
	String sA1 = t1[0];
	String sA2 = t1[1];
	String sB1 = t2[0];
	String sB2 = t2[1];

	int compare1 = sA1.compareTo(sB1);
	int compare2 = sA2.compareTo(sB2);
	if(compare1 != 0){
		return compare1;
	}else{
		if(compare2 != 0){
			return -compare2;
		}else{
			return 0;
		}
	}
}

场景5

处理字段:字段STR1、NUM1
比较方式:先根据STR1字典序升序比较,再根据NUM1整型方式降序比较。
处理方式:
Map 中key组织方式:

context.write(new Text(str1 + "\t" + num1), value);

OwnTextKeyComparator 中compare函数处理:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
	// 分别取得两个key的数据,根据比较结果给出返回值
	byte[] _b1 = Arrays.copyOfRange(b1, s1 + 1, s1 + l1);
	byte[] _b2 = Arrays.copyOfRange(b2, s2 + 1, s2 + l2);
	String[] t1 = new String(_b1).split("\t");
	String[] t2 = new String(_b2).split("\t");
	String sA1 = t1[0];
	int nA2 = Integer.parseInt(t1[1]);
	String sB1 = t2[0];
	int nB2 = Integer.parseInt(t2[1]);

	int compare1 = sA1.compareTo(sB1);
	int compare2 = nA2-nB2;
	if(compare1 != 0){
		return compare1;
	}else{
		if(compare2 != 0){
			return -compare2;
		}else{
			return 0;
		}
	}
}

四、小结:

本文简单探讨在Map、Reduce计算过程中,如何按照计算需求调整对应key的比较方式。
实际中有可能需要字段按照字典序或整型排序(升序或者降序),也可能需要进行二次排序(见《Hadoop 二次排序》)。
另外,在处理大量数据时,不可避免会有字段值丢失或者其他异常情况存在,所以对应逻辑需要能处理各种异常情况

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>