如何用python简单的设计开发异步任务调度队列

什么是任务调度队列,在python里面有不少这样的好东西,比如celery,RQ,huey    。  这三个我都用过,也都是在线上的环境下应用,分布式的节点也有不少。  他主要是解决任务的异步性,比如你找回密码的时候需要邮件的发送token,这时候不太可能是同步,因为发邮件这种事情,本身就有很多的不确定性,一个点背,就会堵塞逻辑。 这时候我们可以用celery rq这样的任务队列来离线的发送邮件。

文章的原文是在 http://xiaorui.cc/?p=1561

那么如何自己开发一套这样的任务队列,我相信有不少人用过 生产者 –》MQ –》 消费者….   这个是最典型的离线非堵塞的实现逻辑,但是如何实现任务的计划任务性,如何实现超时的控制,如何实现优先级队列,如何实现计划的在线修改,如何实现任务的进度。 是的,这些celery,rq早就实现了,我还就喜欢造轮子…  虽然没有他们造的好,但因为是自主开发,所以整个逻辑简单干练,而不像celery那么沉重… …


造轮子开始,我懒得用在线的画图,就在本子上随意的写了写,画了画,其实写博客,原本就是留着自己做笔记的,没想着如何忽悠大家…. 


首先,客户端可以直接扔任务到一个web services的接口上    –》  web api接收到任务后,会根据客户端的ip和时间戳做task_id,返回给客户,紧接着在redis里面标记这任务的状态。    格式为  func,args,kwargs,timeout=xx,queue_level=xx,interval_time=xx

主服务端:

一个线程,会不停的扫描那个redis hash表,取出任务的interval_time后,进行取模,如果匹配成功,就会塞到 redis sorted set有续集和里面。

主线程,会不停的看看sorted set里面,有没有比自己实现小的任务,有的话,执行并删除。 这里的执行是用多进程,为毛用多进程,因为线程很多时候是不好控制强制干掉的。 每个任务都会用multiprocessing的方式去执行,去调用的时候,会多传进一个task_id,用来把相关的进度推送到redis里面。 另外,fork进程后,我会得到一个pid,我会把pid和timeout的信息,存放到kill_hash里面。 然后会不间断的查看,在指定的timeout内,这pid还在不在,如果还是存在,没有退出的话,说明他的任务不太正常,我们就可以在main(),里面干掉这些任务。 

所谓的优先级就是个 High + middle +Low 的三合一链条而已,我每次都会坚持从高到低取任务,如果你的High级别的任务不断的话,那么我会一直干不了低级别的任务了。 代码的体现是在redis sorted set这边,设立三个有序集合,我的worker队列会从high开始做……

那么如果想干掉一个任务是如何操作的,首先我需要在 kill_hash 里面标记任务应该赶紧干掉,在就是在task_hash里面把那个task_id干掉,好让他不会被持续的加入待执行的队列里面。 

真的好久没有写字了,发现字好难看呀…   








大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

3 Responses

  1. duma 2015年8月10日 / 下午3:57

    你笔误了,不是hury,而是huey

duma进行回复 取消回复

邮箱地址不会被公开。 必填项已用*标注