又要说起rq了,python的rq是个简单到没朋友的任务队列。 记得去年以前用的都是celery,但自从看了接触rq并看了代码片段后,发现实现这东西实现很是有意思。 这边有个场景需要启动多个rqworker,但是rq可能就是因为他太简单,有些功能还没做… 比如我现在说的多进程模式。在rq 的github中看了不少老外们提的issue,有一些话题跟我今天说的一样,是关于rq多进程模式的。 其实还是有闲人提交了python rq的多个worker模式,有forking,multiprocessing、gevent的模式。 这些启动的方式也破简单,rqworker -w ‘path.modules’ . 我看了下他们的源代码,实现起来有些费劲。 python rq已经给出了一个例子,虽然是单线程阻塞的版本。 另外说明下,如果不认真看文档的话,会认为rqworker –burst是个多进程的解决方案,但他在官方的定义是突发解决模式,何为突发(burst) 模式? 我临时起个进程处理任务,处理完了后,我自己退出进程….. 这也太粗糙了…..
文章总是不断更新中,这里注明下原文地址. http://xiaorui.cc/2015/11/14/%E7%94%A8multiprocessing%E5%B0%81%E8%A3%85python-rq-worker%E5%A4%9A%E8%BF%9B%E7%A8%8B%E6%A8%A1%E5%BC%8F/
#!/usr/bin/env python #blog: xiaorui.cc import sys from rq import Queue, Connection, Worker #使用with阻塞Connection()连接任务队列,使用rq Worker来启动任务。 with Connection(): qs = map(Queue, sys.argv[1:]) or [Queue()] w = Worker(qs) w.work()
使用multiprocessing实现的rq多进程版本
import os import redis import multiprocessing from rq import Worker, Queue, Connection listen = ['high', 'default', 'low'] redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379') conn = redis.from_url(redis_url) def worker(): with Connection(conn): worker = Worker(map(Queue, listen)) worker.work() if __name__ == '__main__': pool = multiprocessing.Pool(processes=4) for i in xrange(3): pool.apply_async(worker,) pool.close() pool.join()
但是这有个缺点,Connection()会一直brpop redis队列,那么你用ctrl c终止任务的时候会失败。 那么肿么搞? 当然最简单的方法使用python threading多线程去监听ctrl c.
除此之外,还可以用signal信号的方式来解决。
import os import logging import sys import signal import redis import multiprocessing from rq import Worker, Queue, Connection listen = ['high', 'default', 'low'] redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379') conn = redis.from_url(redis_url) def sigint_handler(signum,frame): for i in pid_list: os.kill(i,signal.SIGKILL) logging.info("exit...") sys.exit() def worker(): logging.info('this is worker') with Connection(conn): worker = Worker(map(Queue, listen)) worker.work() pid_list = [] signal.signal(signal.SIGINT,sigint_handler) if __name__ == '__main__': pool = multiprocessing.Pool(processes=4) #multiprocessing pool多进程池,processes=4 指定进程数 for i in xrange(3): pool.apply_async(worker,) for i in multiprocessing.active_children(): pid_list.append(i.pid) pid_list.append(os.getpid()) pool.close() pool.join()
下面是返回的结构… 我们会发现多进程生效了,任务并行了。。。。
#coding:utf-8 #blog : xiaorui.cc import time def test_mq(*args,**kwargs): time.sleep(10) #我这里sleep 10秒钟 print 'mq...' return "You said " __all__ = ('test_mq')
13:51:12 default: api.test_mq(123, 456, name=1) (b76c23f4-0c6b-4052-9983-b6f2933e2f6c)
13:51:13 default: api.test_mq(123, 456, name=1) (2ec17177-e707-4f9c-904a-c6320a4dc10b)
13:51:15 default: api.test_mq(123, 456, name=1) (d266575f-db75-432f-a075-890676f546f6)
mq…
13:51:22 Job OK, result = u’\x1b[33mYou said \x1b[39;49;00m’
13:51:22 Result is kept for 500 seconds
13:51:22
13:51:22 *** Listening on high, default, low…
mq…
13:51:23 Job OK, result = u’\x1b[33mYou said \x1b[39;49;00m’
13:51:23 Result is kept for 500 seconds
13:51:23
13:51:23 *** Listening on high, default, low…
mq…
13:51:25 Job OK, result = u’\x1b[33mYou said \x1b[39;49;00m’
13:51:25 Result is kept for 500 seconds
13:51:25
13:51:25 *** Listening on high, default, low…
对于rq任务队列的多进程模式就说完了,有兴趣的朋友可以看看rq worker模块相关的实现代码。
….