python rq的定时及计划任务(delay)实现

前言:

python rq是个不错的任务队列服务,就因为他小而精,所有在一定程度上比celery都要受到欢迎。 最近有个需求让我不得不想喷他,他没有定时执行的功能,很多时候我并不想直接就执行执行。 起初有个简单的想法,想把任务丢给处理函数后,一直在sleep,直到任务可以执行。

关于rq的定时计划任务写得不怎么严谨,后期会不断更新修改,标注下原文地址:

http://xiaorui.cc/2015/12/15/python-rq%E7%9A%84%E5%AE%9A%E6%97%B6%E5%8F%8A%E8%AE%A1%E5%88%92%E4%BB%BB%E5%8A%A1delay%E5%AE%9E%E7%8E%B0/


问题:

但问题又来了,rq是个单进程,单任务处理模式,他的默认worker是不支持并发执行的,我很是想不明白 (默认worker是os.fork()模式,多fork几个不就多进程了)。  python rq文档里有个burst参数,汉语意思是突发模式,以为是并发执行任务… 结果,呵呵… 

起初的那想法是不靠谱,就算rq支持多进程、多线程模式,也不适合这样的场景.  因为我这边会主动仍enqueue入5000个任务,这些任务都做了定时处理,拿到我要5000个sleep time? 不合理…

解决方案:

那么怎么搞 ? 用redis的sorted set + delay_server + rq worker 

实现的逻辑比较简单,你把任务暂时扔到redis有序集合里面,然后由delay_server去处理,当符合定时的时间后,塞入rq的queue里面,由rq来消费.

部分代码片段是来源于 http://www.velvetcache.org/2013/04/15/delayed-queues-for-rq . 这哥们代码是放在gist,我本人对于github gist的fork看不顺眼, 所以我另起一个项目,另外封装了老外那些代码,做了一些适合我这边场景的更新.

下面是我更新的几个功能:

首先在这基础上加入了rqworker的多进程模式,并且在rqworker里加入时间队列的判断逻辑. 这样把rqworker跟rq_timer集合在一个启动脚本里面了,避免了一个项目中旺季启动多个进程的情况.
加入计划任务的功能,虽然没有crontab取模那么精准,但是误差了了. 

题外话,rq-scheduler的实现也是类似于有序队列那样,每个任务都含有任务的间隔时间.

这里的的rq_worker.py多进程模式是使用python multiprocessing pool实现,函数及参数的传递序列化是pickle实现.


项目地址是, https://github.com/rfyiamcool/python_rq_timer

#coding:utf-8
#blog: xiaorui.cc
#email: rfyiamcool@163.com

import os
import logging
import sys
import signal
import redis
import time
import multiprocessing

from rq import Worker, Queue, Connection
from delay import DelayedJob

listen = ['high', 'default', 'low']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)

def sigint_handler(signum,frame):
    for i in pid_list:
        os.kill(i,signal.SIGKILL)
    logging.info("exit...")
    sys.exit()


def soon_worker():
    logging.info('this is worker')
    with Connection(conn):
        worker = Worker(map(Queue, listen))
        worker.work()

def delay_worker():
    while True:
        print 'Enqueued %d Jobs.' % delayed_jobs.enqueue_delayed_jobs()
        try:
            time.sleep(5)
        except KeyboardInterrupt:
            print "Shutting Down"
            break

pid_list = []
signal.signal(signal.SIGINT,sigint_handler)

#redis_connection = Redis()
delayed_jobs = DelayedJob(conn)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=5)
    pool.apply_async(delay_worker,)
    for i in xrange(3):
        pool.apply_async(soon_worker,)
    for i in multiprocessing.active_children():
        print i
        pid_list.append(i.pid)
    pid_list.append(os.getpid())
    pool.close()
    pool.join()

这样实现的rq计划任务有些不优雅,有兴趣的朋友可以改进下. 

END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

1 Response

mie_mie_mie_进行回复 取消回复

邮箱地址不会被公开。 必填项已用*标注