Hadoop 二次排序

一、问题描述:

在用hadoop进行数据处理时,我们往往需要按照key来对日志进行排序处理。
假定用户访问日志部分字段如下:

userId
visitTime
ip
userAgent

问题:
按照ip进行排序,并统计每个ip上的用户数及每个用户的访问时间分布(24小时)。
注:每个ip上用户数每个用户24小时分布,可用多路径方式输出,本文不对此做详细描述。

二、分析思路:

1.map输出时key、value设置

key: ip + “\t” + userId + “\t” + visitTime         //后续代码中这块用到类Tuple
value: ip + “\t” + userId + “\t” + visitTime
说明:
在Mapper到Reducer之前,key中ip主要作用是将相同的ip映射到同一个partition中,再根据userid 和visitTime对不同的日志进行排序。

在Reducer阶段中,一个partition中可能会有多个不同的ip取值,所以在value中要给出ip等维度值。

2.reduce循环处理问题

由于被分到同一partition的日志可能存在不同的ip,且处理时可能涉及多层输出。故在for循环结束的时候仍需要判断最后一个ip和userId是否已经处理。

三、参考代码:

1.Mapper 参考代码:

public static class Map extends Mapper<Object, Text, Tuple, Text>{
	@Override
	public void map(Object key, Text value, Context context) throws IOException,
		InterruptedException{
		// userid visittime ip useragent
		String[] items = value.toString().split("\t");
		userId = items[0];
		visitTime = items[1];
		ip = items[2];
		userAgent = items[3];
		Tuple tuple = new Tuple(3);
		tuple.set(0, ip);
		tuple.set(1, userId);
		tuple.set(2, visitTime);
		context.write(tuple,
				new Text(ip + "\t" + userId + "\t" + visitTime));
	}
}

2.Reducer 参考代码:

public static class Reduce extends Reducer<Tuple, Text, NullWritable, Text>{
	@Override
	public void map(Object key, Text value, Context context) throws IOException,
		InterruptedException{
		// 临时变量
		// 上行数据ip值
		String preIp = " ";	// 假定正常用户值不会出现" "

		// 上行数据userid值 	// 假定正常用户值不会出现"-1"
		String preUserId = "-1";

		// 当前ip上用户数
		long userCount = 0;

		// 当前用户 24小时访问次数统计
		int[] visitTimeArray = new int[24];
		for(int i = 0; i< 24; i++){
			visitTimeArray[i] = 0;
		}

		// 依次处理每行日志
		for (Text value : values) {
			String[] items = value.toString().split("\t");
			ip = items[0];
			userId = items[1];
			visitTime = items[2];

			// 判断并处理
			if(preIp.equals(ip)){
				if(userid.equals(preUserId)){
					// 更新当前用户的24小时统计
				}else{
					// 输出前一用户对应24小时访问数据
					// 设置 visitTimeArray
					// 更新 userCount
				}
			}else{
				if(!preIp.equals(" ")){
					// 输出当前ip信息
				}

				// 设置 visitTimeArray userCount
			}

			preIp = ip;
			preUserId = userId;

		}// end of for

		// 处理最后一个ip和userId
		if(!preIp.equals(" ") && preUserId.equals("-1")){
			// 输出当前userId信息
			// 输出当前ip信息
		}
	}
}

3.Mapper 到 Reducer时 key映射处理:

类GroupingComparator主要用于不同key值的比较排序,即依次按照 ip、userId、visitTime等排序。
类MyPartitioner主要是将选定的key的子集来映射partition,比如将相同的ip(key的第一个元素)放到同一个partition中。
使用时在main函数中如下设置即可:

//分区
job.setPartitionerClass(MyPartitioner.class);
//分组
job.setGroupingComparatorClass(GroupingComparator.class);

上述两个类的参考代码如下:

public class MyPartitioner extends Partitioner<Tuple, Text>{
	@Override
	public int getPartition(Tuple key, Text value, int numPartitions){
		Object object = new Object();
		object = key.get(0).toString();    // 第一个元素即 ip
		return Math.abs(object.hashCode() & Integer.MAX_VALUE) % numPartitions;
	}
}
public class GroupingComparator implements RawComparator {

	public int compare(Tuple arg0, Tuple arg1) {
		return arg0.getString(0).compareTo(arg1.getString(0));
	}
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		return 0;
	}
}

四、小结:

本文主要讨论在Hadoop中如何进行二次排序,文中描述仅为一种可行方案。若在Reducer阶段需要针对详细日志进行统计或处理,需要在for循环处理结束时考虑最后一个取值(或组合取值)是否已处理。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>