2014-04-11
Hadoop允许使用非Java编程语言实现mapreduce,这得益于其streaming机制,其实就是使用了Unix标准流(关于标准流的使用,请参考资料[1])。本文先论述如何基于流自制一个简单的单词计数的MapReducer,然后讨论如何使用Hadoop的streaming,最后介绍一个基于Python的Hadoop工具mrjob。
基于Linux管道的MapReducer
管道的作用是将一个命令的输出导入到另一个命令的输入,例如我们要查看进程nginx的信息,如下:
zsh >> ps -ef | grep 'nginx'
root 1243 1 0 08:44 ? 00:00:00 nginx: master process /usr/sbin/nginx
www-data 1244 1243 0 08:44 ? 00:00:00 nginx: worker process
www-data 1245 1243 0 08:44 ? 00:00:00 nginx: worker process
www-data 1247 1243 0 08:44 ? 00:00:00 nginx: worker process
www-data 1248 1243 0 08:44 ? 00:00:00 nginx: worker process
sunlt 4638 4385 0 09:38 pts/3 00:00:00 grep nginx
ps -ef
通过标准输出在终端里显示所有进程的信息,一行代表一个进程;grep 'nginx'
命令用来寻找含有nginx的行并输出,也就是说该命令会过滤掉不含nginx的行。管道符号 |
用于将ps -ef
的输出转换为grep 'nginx'
的输入,于是得到了进程nginx的信息。
另外,上面命令运行结果的最后一行是grep 'nginx' 这个进程的信息,如果要过滤掉,可以:
zsh >> ps -ef | grep 'nginx' | grep -v 'grep'
root 1243 1 0 08:44 ? 00:00:00 nginx: master process /usr/sbin/nginx
www-data 1244 1243 0 08:44 ? 00:00:00 nginx: worker process
www-data 1245 1243 0 08:44 ? 00:00:00 nginx: worker process
www-data 1247 1243 0 08:44 ? 00:00:00 nginx: worker process
www-data 1248 1243 0 08:44 ? 00:00:00 nginx: worker process
为了通过管道来实现MapReduce方式单词计数,先建立文件mapper.py执行map工作:
import sys
for line in sys.stdin:
ls = line.split()
for word in ls:
if len(word.strip()) != 0:
print word + ',' + str(1)
建立文件reducer.py执行reduce工作:
import sys
word_dict = {}
for line in sys.stdin:
ls = line.split(',')
word_dict.setdefault(ls[0], 0)
word_dict[ls[0]] += int(ls[1])
for word in word_dict:
print word, word_dict[word]
现有文件wordcount.input,内容如下:
aaa aa asd
asd
dsa asd
执行下面命令:
zsh >> cat wordcount.input | python mapper.py | python reducer.py
运行结果如下:
aa 1
dsa 1
aaa 1
asd 3
使用Hadoop Streaming
首先参考 Hadoop1.2配置伪分布式 使用Hadoop1.2 配置一个伪分布式集群。 启动集群:start-all.sh 将文件wordcount.input拷贝到hdfs的/下:
$ hadoop dfs -copyFromLocal wordcount.input /
执行下面的命令:
$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar \
-input /wordcount.input \
-output /output \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file mapper.py \
-file reducer.py
查看结果:
$ hadoop dfs -cat /output/part-00000
Warning: $HADOOP_HOME is deprecated.
aa 1
dsa 1
aaa 1
asd 3
如果是多个输入文件,可以将这些文件放在同一个目录下,例如将wordcount.input和wc.input放在/input目录下,然后执行下面的命令:
$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar \
-input /input \
-output /output \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file mapper.py \
-file reducer.py
查看结果:
zsh >> hadoop dfs -cat /output/part-00000
Warning: $HADOOP_HOME is deprecated.
aa 2
dsa 2
aaa 2
asd 6
使用mrjob
mrjob使用python编写,在资料[2]中如下介绍mrjob:
mrjob is a framework that assists you in submitting your job to the Hadoop job tracker and in running each individual step under Hadoop Streaming.
安装
应该先配置好环境变量$HADOOP_HOME,然后执行:
sudo pip install mrjob
当前版本是v0.4.2。
编写代码
在https://pythonhosted.org/mrjob/guides/quickstart.html下复制以下代码:
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
yield "chars", len(line)
yield "words", len(line.split())
yield "lines", 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
命名为word_count.py。 mrjob有一调试模式,并不真正使用hadoop,而只是模拟:
zsh >> python word_count.py wordcount.input
结果如下:
"chars" 20
"lines" 3
"words" 6
而要使用hadoop的话,执行:
python word_count.py -r hadoop hdfs://localhost:9000/input
犯了一个狗血的错误
按照0.4.2的文档,我复制了一下单词计数的代码,保存为mrjob.py,如下运行之:
python mrjob.py wordcount.input
提示找不到mrjob模块下的job。
代码第一行是:from mrjob.job import MRJob
难为了好大一会,终于找到原因,单词计数程序名和mrjob重名。于是改为wc.py,并删除mrjob.pyc。
python wc.py wordcount.input
成功运行。
资料
[1] 学习 Linux,101: 流、管道和重定向 http://www.ibm.com/developerworks/cn/linux/l-lpic1-v3-103-4/
[2] https://pythonhosted.org/mrjob/guides/concepts.html