用multiprocessing封装python rq worker多进程模式


又要说起rq了,python的rq是个简单到没朋友的任务队列。 记得去年以前用的都是celery,但自从看了接触rq并看了代码片段后,发现实现这东西实现很是有意思。 这边有个场景需要启动多个rqworker,但是rq可能就是因为他太简单,有些功能还没做… 比如我现在说的多进程模式。在rq 的github中看了不少老外们提的issue,有一些话题跟我今天说的一样,是关于rq多进程模式的。  其实还是有闲人提交了python rq的多个worker模式,有forking,multiprocessing、gevent的模式。 这些启动的方式也破简单,rqworker -w ‘path.modules’ . 我看了下他们的源代码,实现起来有些费劲。  python rq已经给出了一个例子,虽然是单线程阻塞的版本。 另外说明下,如果不认真看文档的话,会认为rqworker –burst是个多进程的解决方案,但他在官方的定义是突发解决模式,何为突发(burst) 模式?  我临时起个进程处理任务,处理完了后,我自己退出进程…..  这也太粗糙了…..

文章总是不断更新中,这里注明下原文地址.  http://xiaorui.cc/2015/11/14/%E7%94%A8multiprocessing%E5%B0%81%E8%A3%85python-rq-worker%E5%A4%9A%E8%BF%9B%E7%A8%8B%E6%A8%A1%E5%BC%8F/

#!/usr/bin/env python
#blog: xiaorui.cc
import sys
from rq import Queue, Connection, Worker

#使用with阻塞Connection()连接任务队列,使用rq Worker来启动任务。
with Connection():
    qs = map(Queue, sys.argv[1:]) or [Queue()]
    w = Worker(qs)
    w.work()

使用multiprocessing实现的rq多进程版本

import os
import redis
import multiprocessing
from rq import Worker, Queue, Connection
listen = ['high', 'default', 'low']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
def worker():
    with Connection(conn):
        worker = Worker(map(Queue, listen))
        worker.work()
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    for i in xrange(3):
        pool.apply_async(worker,)
    pool.close()
    pool.join()

但是这有个缺点,Connection()会一直brpop redis队列,那么你用ctrl c终止任务的时候会失败。 那么肿么搞?  当然最简单的方法使用python threading多线程去监听ctrl c. 
除此之外,还可以用signal信号的方式来解决。

import os
import logging
import sys
import signal
import redis
import multiprocessing
from rq import Worker, Queue, Connection
listen = ['high', 'default', 'low']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)


def sigint_handler(signum,frame):
    for i in pid_list:
        os.kill(i,signal.SIGKILL)
    logging.info("exit...")
    sys.exit()


def worker():
    logging.info('this is worker')
    with Connection(conn):
        worker = Worker(map(Queue, listen))
        worker.work()


pid_list = []
signal.signal(signal.SIGINT,sigint_handler)
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)   #multiprocessing pool多进程池,processes=4 指定进程数
    for i in xrange(3):
        pool.apply_async(worker,)
    for i in multiprocessing.active_children():
        pid_list.append(i.pid)
    pid_list.append(os.getpid())
    pool.close()
    pool.join()

下面是返回的结构… 我们会发现多进程生效了,任务并行了。。。。

#coding:utf-8
#blog : xiaorui.cc
import time

def test_mq(*args,**kwargs):
    time.sleep(10)     #我这里sleep 10秒钟
    print 'mq...'
    return "You said "

__all__ = ('test_mq')

13:51:12 default: api.test_mq(123, 456, name=1) (b76c23f4-0c6b-4052-9983-b6f2933e2f6c)
13:51:13 default: api.test_mq(123, 456, name=1) (2ec17177-e707-4f9c-904a-c6320a4dc10b)
13:51:15 default: api.test_mq(123, 456, name=1) (d266575f-db75-432f-a075-890676f546f6)

mq…
13:51:22 Job OK, result = u’\x1b[33mYou said \x1b[39;49;00m’
13:51:22 Result is kept for 500 seconds
13:51:22
13:51:22 *** Listening on high, default, low…
mq…
13:51:23 Job OK, result = u’\x1b[33mYou said \x1b[39;49;00m’
13:51:23 Result is kept for 500 seconds
13:51:23
13:51:23 *** Listening on high, default, low…
mq…
13:51:25 Job OK, result = u’\x1b[33mYou said \x1b[39;49;00m’
13:51:25 Result is kept for 500 seconds
13:51:25
13:51:25 *** Listening on high, default, low…

对于rq任务队列的多进程模式就说完了,有兴趣的朋友可以看看rq worker模块相关的实现代码。   

….


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

电子邮件地址不会被公开。 必填项已用*标注