python使用mrjob实现hadoop上的mapreduce

前言:

     这两天忙着把51cto里面的博客,转回我的独立博客,以前只是顾着在51写了,才发现原来我写了那么多东西。 

    正题,咱们一般写mapreduce是通过java的mapreduce api或者是python调用streaming来写的。然而身为pythoner的我,java虽然会点,但是不足以支撑搞数据分析,没办法就用streaming来写mapreduce日志分析。 这里要介绍一个模块,是基于streaming搞的东西,看些官网的介绍有些意思。 


mrjob实质上也就是在Hadoop Streaming的命令行上包了一层,有了统一的Python界面,而无需你再去直接调用Hadoop Streaming命令。既然涉及到Streaming,就说明,他在速度方面不见得快,但是最少在语言用法上来说,比以前些pipe管道要好的多。 

Hello Girls , 请关注我的博客,blog.xiaorui.cc

mrjob 可以让用 Python 来编写 MapReduce 运算,并在多个不同平台上运行,你可以:

  • 使用纯 Python 编写多步的 MapReduce 作业

  • 在本机上进行测试

  • 在 Hadoop 集群上运行

首先用pip安装mrjob的包:

pip install mrjob

下面是我用来测试的例子,

#coding:utf-8
from mrjob.job import MRJob
import re
#xiaorui.cc
#WORD_RE = re.compile(r"[\w']+")
WORD_RE = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
class MRWordFreqCount(MRJob):
    def mapper(self, word, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1
    def combiner(self, word, counts):
        yield word, sum(counts)
    def reducer(self, word, counts):
        yield word, sum(counts)
if __name__ == '__main__':
    MRWordFreqCount.run()


mrjob在cli执行的用法很简单:

python i.py -r inline input1 input2 input3 > out 命令可以将处理多个文件的结果输出到out文件里面。

本地模拟hadoop运行:python 1.py -r local <input> output

这个会把结果输出到output里面,这个output必须写。

hadoop集群上运行:python 1.py -r hadoop <input> output


[root@kspc ~]# python mo.py -r local  <10.7.17.7-dnsquery.log.1> output

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/mo.root.20131224.040935.241241
reading from STDIN
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000
> /usr/bin/python mo.py –step-num=0 –mapper /tmp/mo.root.20131224.040935.241241/input_part-00000 | sort | /usr/bin/python mo.py –step-num=0 –combiner > /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
> /usr/bin/python mo.py –step-num=0 –mapper /tmp/mo.root.20131224.040935.241241/input_part-00001 | sort | /usr/bin/python mo.py –step-num=0 –combiner > /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
Counters from step 1:
  (no counters found)
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper-sorted
> sort /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000 /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
writing to /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000
> /usr/bin/python mo.py –step-num=0 –reducer /tmp/mo.root.20131224.040935.241241/input_part-00000 > /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000
writing to /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001
> /usr/bin/python mo.py –step-num=0 –reducer /tmp/mo.root.20131224.040935.241241/input_part-00001 > /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001
Counters from step 1:
  (no counters found)
Moving /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000 -> /tmp/mo.root.20131224.040935.241241/output/part-00000
Moving /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001 -> /tmp/mo.root.20131224.040935.241241/output/part-00001
Streaming final output from /tmp/mo.root.20131224.040935.241241/output

removing tmp directory /tmp/mo.root.20131224.040935.241241

执行的时候,资源的占用情况。

发现一个很奇妙的东西,mrjob居然调用shell下的sort来排序。。。。

为了更好的理解mrjob的用法,再来个例子。

from mrjob.job import MRJob
#from xiaorui.cc
class MRWordFrequencyCount(MRJob):
#把东西拼凑起来
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
#总结kv
    def reducer(self, key, values):
        yield key, sum(values)
if __name__ == '__main__':
    MRWordFrequencyCount.run()

看下结果:

下面是官网给的一些个用法:

我们可以看到他是支持hdfs和s3存储的 !
Running your job different ways
The most basic way to run your job is on the command line:
$ python my_job.py input.txt
By default, output will be written to stdout.
You can pass input via stdin, but be aware that mrjob will just dump it to a file first:
$ python my_job.py < input.txt
You can pass multiple input files, mixed with stdin (using the – character):
$ python my_job.py input1.txt input2.txt – < input3.txt
By default, mrjob will run your job in a single Python process. This provides the friendliest debugging experience, but it’s not exactly distributed computing!
You change the way the job is run with the -r/–runner option. You can use -rinline (the default), -rlocal, -rhadoop, or -remr.
To run your job in multiple subprocesses with a few Hadoop features simulated, use -rlocal.
To run it on your Hadoop cluster, use -rhadoop.
If you have Elastic MapReduce configured (see Elastic MapReduce Quickstart), you can run it there with -remr.
Your input files can come from HDFS if you’re using Hadoop, or S3 if you’re using EMR:

$ python sqlite_job.py -r local  --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r hadoop --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r hadoop --database=hdfs://my_dir/my_db.sqlite3
$ python sqlite_job.py -r emr    --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r emr    --database=s3://my_bucket/my_db.sqlite3

说实话,不太推荐用这个,因为就是看起来是高端了,但是没有从根本上解决,我们需要的是性能及处理速度,但是他给不了的,所以还是用hadoop方案中的pig和hive解决,当时如果你和我一样java不熟,但是python很熟。




大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注