这段时间跟同事聊了些任务优先级队列的话题,正好我们这把大量的celery抛弃,开始用rq来做任务队列,因为celery太大,所以改造起来甚是恶心到爆,还不如选择一个简单干练的python rq。

xiaorui.cc

xiaorui.cc

xiaorui.cc

xiaorui.cc


微信的抓取api,我是用django写的,任务队列也是用django-rq,定时任务也是用的rq…. 

看了一些rq的源码实现,看得过程正好和同事谈论到任务的优先级队列,基于redis的实现。  rq的名字由来就是 redis queue….

high = Queue(‘high’, default_timeout=8)  # 8 secs
low = Queue(‘low’, default_timeout=600)  # 10 mins
low.enqueue_call(really_really_slow, timeout=3600)  #

他的实现方式比较的简单,你可以配置n个队列,根据你的业务逻辑产生n个评判的等级,然后根据你所得到的等级扔到不同的队列里面。 


high    :    <=== [][][][][][][]  <===!
middle  :                                      <=== [][][][][][][]  <===!
low     :                                                                <======== [][][][][][][]  <=======!


python rq把redis的一些标准语法也改掉了,比如lpop,他就是进行迭代的取出你的优先级队列。


其实除了用多个list结构来实现多层的任务队列,我们还是可以用 sorted set来实现的…. 

他相比于用list做任务队列的好处在于,他含有score分值的概念,这个分值也可以是时间戳,那么咱们用redis的zrange或者ZRANGEBYSCORE过滤出一定时间范围内的数据。

#0就算开头了,-1 肯定是结尾了
redis 127.0.0.1:6379> zrange myzset 0 -1 withscores

1) “one”

2) “1″

3) “two”

4) “3″

返回集合中score在给定区间的元素
#这里是返回2和3区间的数据
redis 127.0.0.1:6379> zrangebyscore myzset3 2 3 withscores

1) “two”

2) “2″

3) “three”

4) “3″

而外在提下,sorted是可以出现同一个score的,但是value一定不能一样…. 

相同的score,这情况还是会出现的,比如当score是时间戳的时候…. 

redis 127.0.0.1:6379> zadd xiaorui.cc 1 1
(integer) 1
redis 127.0.0.1:6379> zadd xiaorui.cc 1 ‘one’
(integer) 1
redis 127.0.0.1:6379> zadd xiaorui.cc 2 ’2′
(integer) 1
redis 127.0.0.1:6379> zadd xiaorui.cc 2 ‘two’
(integer) 1

redis 127.0.0.1:6379> ZRANGE xiaorui.cc 0 -1 withscores
1) “1″
2) “1″
3) “one”
4) “1″
5) “2″
6) “2″
7) “two”
8) “2″

咱们在看看rq的worker是如何运行的…

最主要的思想就是 他会不停的取出任务,调用execute_job(job),  函数fork一个子进程,这也是咱们每次修改完程序,不用再重启python rq的原因。 

我这里简单说下,我的同事朱伟读了我这篇的文章,指出了我的一个观点,这个观点就是在一个主进程里面,你不管怎么fork,他的子进程的环境是集成于主环境的。 所以,他的自动重载估计是做了reload得机制…

python rq的默认执行时间是360秒,上星期跑大量的数据时,发现大量的timeout的error….  原本以为enqueue的时候,不加timeout参数,rq默认应该不会限制时间,结果他是420s-60s…  这个数字在rq的官网居然没找到…   看了下work.py源码,他是这么说的。。。

通过rq的job类,设置和查询每个jobid的信息,在每个流程下,rq都会记录下他的过程的。 当我的子进程正在执行任务之前,我会发出一个busy的信息,这样work主调度逻辑会pass,不会每次都fork进程。



对Python及运维开发感兴趣的朋友可以加QQ群 : 478476595 !!!

另外如果大家觉得文章对你有些作用!   帮忙点击广告. 一来能刺激我写博客的欲望,二来好维护云主机的费用.
如果想赏钱,可以用微信扫描下面的二维码. 另外再次标注博客原地址  xiaorui.cc  ……   感谢!

python rq的定时及计划任务(delay)实现

前言: python rq是个不错的任务队列服务,就因为他小而精,所有在一定程度上比celery都要受到欢迎。 最近有个需求让我不得不想喷他,他没有定时执行的功能...

阅读全文

用multiprocessing封装python rq worker多进程模式

又要说起rq了,python的rq是个简单到没朋友的任务队列。 记得去年以前用的都是celery,但自从看了接触rq并看了代码片段后,发现实现这东西实现很是有意思...

阅读全文

解决python rq获取返回结果和异常的问题

首先说明下python rq的一个错误提示  Functions from the __main__ module cannot be processed ,  这个提示是在enqueue的时候发生的,他会检测...

阅读全文

  1. 请问发现大量的timeout的error…,这个如何解决,timeout的含义是什么,是等待timeout固定时间,没有得到执行就报错吗

  2. 我想问一下 那个 self.heartbeat() 是什么用处?我运行的时候 总是有条debug信息Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.