前言:
python rq是个不错的任务队列服务,就因为他小而精,所有在一定程度上比celery都要受到欢迎。 最近有个需求让我不得不想喷他,他没有定时执行的功能,很多时候我并不想直接就执行执行。 起初有个简单的想法,想把任务丢给处理函数后,一直在sleep,直到任务可以执行。
关于rq的定时计划任务写得不怎么严谨,后期会不断更新修改,标注下原文地址:
问题:
但问题又来了,rq是个单进程,单任务处理模式,他的默认worker是不支持并发执行的,我很是想不明白 (默认worker是os.fork()模式,多fork几个不就多进程了)。 python rq文档里有个burst参数,汉语意思是突发模式,以为是并发执行任务… 结果,呵呵…
起初的那想法是不靠谱,就算rq支持多进程、多线程模式,也不适合这样的场景. 因为我这边会主动仍enqueue入5000个任务,这些任务都做了定时处理,拿到我要5000个sleep time? 不合理…
解决方案:
那么怎么搞 ? 用redis的sorted set + delay_server + rq worker
实现的逻辑比较简单,你把任务暂时扔到redis有序集合里面,然后由delay_server去处理,当符合定时的时间后,塞入rq的queue里面,由rq来消费.
部分代码片段是来源于 http://www.velvetcache.org/2013/04/15/delayed-queues-for-rq . 这哥们代码是放在gist,我本人对于github gist的fork看不顺眼, 所以我另起一个项目,另外封装了老外那些代码,做了一些适合我这边场景的更新.
下面是我更新的几个功能:
首先在这基础上加入了rqworker的多进程模式,并且在rqworker里加入时间队列的判断逻辑. 这样把rqworker跟rq_timer集合在一个启动脚本里面了,避免了一个项目中旺季启动多个进程的情况.
加入计划任务的功能,虽然没有crontab取模那么精准,但是误差了了.
题外话,rq-scheduler的实现也是类似于有序队列那样,每个任务都含有任务的间隔时间.
这里的的rq_worker.py多进程模式是使用python multiprocessing pool实现,函数及参数的传递序列化是pickle实现.
项目地址是, https://github.com/rfyiamcool/python_rq_timer
#coding:utf-8 #blog: xiaorui.cc #email: rfyiamcool@163.com import os import logging import sys import signal import redis import time import multiprocessing from rq import Worker, Queue, Connection from delay import DelayedJob listen = ['high', 'default', 'low'] redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379') conn = redis.from_url(redis_url) def sigint_handler(signum,frame): for i in pid_list: os.kill(i,signal.SIGKILL) logging.info("exit...") sys.exit() def soon_worker(): logging.info('this is worker') with Connection(conn): worker = Worker(map(Queue, listen)) worker.work() def delay_worker(): while True: print 'Enqueued %d Jobs.' % delayed_jobs.enqueue_delayed_jobs() try: time.sleep(5) except KeyboardInterrupt: print "Shutting Down" break pid_list = [] signal.signal(signal.SIGINT,sigint_handler) #redis_connection = Redis() delayed_jobs = DelayedJob(conn) if __name__ == '__main__': pool = multiprocessing.Pool(processes=5) pool.apply_async(delay_worker,) for i in xrange(3): pool.apply_async(soon_worker,) for i in multiprocessing.active_children(): print i pid_list.append(i.pid) pid_list.append(os.getpid()) pool.close() pool.join()
这样实现的rq计划任务有些不优雅,有兴趣的朋友可以改进下.
END.
有个rq-scheduler