Hadoop 2.4 实现单词计数


#大数据


2014-11-17

关于hadoop2.4的配置请参考 hadoop2.4.1单机伪分布式安装和配置

现在有两个文本文件t1.txtt2.txt,内容分别是:

t1.txt:

Sign up for GitHub. By clicking "Sign up for GitHub", you agree to our terms of service and privacy policy. We will send you account related emails occasionally

t2.txt:

and and  you 

问题1是,如何对t1.txt中的单词进行计数。

问题2是,如何对t1.txt和t2.txt中的单词进行计数。

创建目录并将文本文件放入HDFS

zsh >> $HADOOP_PREFIX/bin/hadoop fs -mkdir /input/
zsh >> $HADOOP_PREFIX/bin/hadoop fs -put t1.txt /input
zsh >> $HADOOP_PREFIX/bin/hadoop fs -put t2.txt /input

对t1.txt中的单词进行计数

打开eclipse,创建项目WordCount,导入hadoop-2.4.1/share/hadoop/common/hadoop-common-2.4.1.jarhadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar,创建WordCount.java,将hadoop自带的wordcount源码粘贴进去并略做修改:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception {
		System.out.println("start...");
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path("/input/t1.txt"));
		FileOutputFormat.setOutputPath(job, new Path("/output"));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

下面解释一下这段代码。 TokenizerMapper类继承自Mapper<Object, Text, Text, IntWritable>,其原型如下:

@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends Object

KEYIN是map的输入数据的键的类型,VALUEIN是map的输入数据的值的类型,KEYOUT是map处理后输出的结果中键的类型,VALUEOUT是map处理后输出的结果中值的类型。

TokenizerMapper类的map函数中使用了StringTokenizer类,其根据空格、换行符等对一段文本进行拆分,功能比较简单(也不够实用),下面是一个例子:

import java.util.StringTokenizer;

public class Test {
	public static void main(String[] args) {
		StringTokenizer itr = new StringTokenizer("hello world! hi\nhadoop");
		while (itr.hasMoreTokens()) {
			System.out.println(itr.nextToken());
		}
	}
}

运行结果如下:

hello
world!
hi
hadoop

IntSumReducer类以及main()函数就不介绍了。 有一点要注意,在main()函数中,设置了输入的文件是/input/t1.txt,MapReduce结果放入/output目录中。

将该项目导出为WordCount.jar包后,执行:

$HADOOP_PREFIX/bin/hadoop jar WordCount.jar WordCount

运行完毕,查看reduce后的结果:

zsh >> $HADOOP_PREFIX/bin/hadoop fs -cat /output/part-r-00000
"Sign	1
By	1
GitHub",	1
GitHub.	1
Sign	1
We	1
account	1
agree	1
and	1
clicking	1
emails	1
for	2
occasionally	1
of	1
our	1
policy.	1
privacy	1
related	1
send	1
service	1
terms	1
to	1
up	2
will	1
you	2

结果正确。标点符号的混入以及单词大小写的问题可以根据需要完善一下。

对t1.txt和t2.txt中的单词进行计数

先把/output目录删除了:

$HADOOP_PREFIX/bin/hadoop fs -rmr /output 

将上面的WordCount.javamain()函数中的

FileInputFormat.addInputPath(job, new Path("/input/t1.txt"));

替换为:

FileInputFormat.addInputPath(job, new Path("/input/t1.txt"));
FileInputFormat.addInputPath(job, new Path("/input/t2.txt"));

或者替换为:

FileInputFormat.setInputPaths(job, "/input");

MapReduce的结果如下:

zsh >> $HADOOP_PREFIX/bin/hadoop fs -cat /output/part-r-00000
"Sign	1
By	1
GitHub",	1
GitHub.	1
Sign	1
We	1
account	1
agree	1
and	3
clicking	1
emails	1
for	2
occasionally	1
of	1
our	1
policy.	1
privacy	1
related	1
send	1
service	1
terms	1
to	1
up	2
will	1
you	3

andyou出现的次数变成了3,结果正确。

关于FileInputFormat,具体可查看官方文档:Class FileInputFormat<K,V>


( 本文完 )