什么是任务调度队列,在python里面有不少这样的好东西,比如celery,RQ,huey    。  这三个我都用过,也都是在线上的环境下应用,分布式的节点也有不少。  他主要是解决任务的异步性,比如你找回密码的时候需要邮件的发送token,这时候不太可能是同步,因为发邮件这种事情,本身就有很多的不确定性,一个点背,就会堵塞逻辑。 这时候我们可以用celery rq这样的任务队列来离线的发送邮件。

文章的原文是在 http://xiaorui.cc/?p=1561

那么如何自己开发一套这样的任务队列,我相信有不少人用过 生产者 –》MQ –》 消费者….   这个是最典型的离线非堵塞的实现逻辑,但是如何实现任务的计划任务性,如何实现超时的控制,如何实现优先级队列,如何实现计划的在线修改,如何实现任务的进度。 是的,这些celery,rq早就实现了,我还就喜欢造轮子…  虽然没有他们造的好,但因为是自主开发,所以整个逻辑简单干练,而不像celery那么沉重… …


造轮子开始,我懒得用在线的画图,就在本子上随意的写了写,画了画,其实写博客,原本就是留着自己做笔记的,没想着如何忽悠大家…. 


首先,客户端可以直接扔任务到一个web services的接口上    –》  web api接收到任务后,会根据客户端的ip和时间戳做task_id,返回给客户,紧接着在redis里面标记这任务的状态。    格式为  func,args,kwargs,timeout=xx,queue_level=xx,interval_time=xx

主服务端:

一个线程,会不停的扫描那个redis hash表,取出任务的interval_time后,进行取模,如果匹配成功,就会塞到 redis sorted set有续集和里面。

主线程,会不停的看看sorted set里面,有没有比自己实现小的任务,有的话,执行并删除。 这里的执行是用多进程,为毛用多进程,因为线程很多时候是不好控制强制干掉的。 每个任务都会用multiprocessing的方式去执行,去调用的时候,会多传进一个task_id,用来把相关的进度推送到redis里面。 另外,fork进程后,我会得到一个pid,我会把pid和timeout的信息,存放到kill_hash里面。 然后会不间断的查看,在指定的timeout内,这pid还在不在,如果还是存在,没有退出的话,说明他的任务不太正常,我们就可以在main(),里面干掉这些任务。 

所谓的优先级就是个 High + middle +Low 的三合一链条而已,我每次都会坚持从高到低取任务,如果你的High级别的任务不断的话,那么我会一直干不了低级别的任务了。 代码的体现是在redis sorted set这边,设立三个有序集合,我的worker队列会从high开始做……

那么如果想干掉一个任务是如何操作的,首先我需要在 kill_hash 里面标记任务应该赶紧干掉,在就是在task_hash里面把那个task_id干掉,好让他不会被持续的加入待执行的队列里面。 

真的好久没有写字了,发现字好难看呀…   









对Python及运维开发感兴趣的朋友可以加QQ群 : 478476595 !!!
{ 2000人qq大群内有各厂大牛,常组织线上分享及沙龙,对高性能及分布式场景感兴趣同学欢迎加入该QQ群 }

另外如果大家觉得文章对你有些作用!   帮忙点击广告. 一来能刺激我写博客的欲望,二来好维护云主机的费用.
如果想赏钱,可以用微信扫描下面的二维码. 另外再次标注博客原地址  xiaorui.cc  ……   感谢!
暂无相关产品

3则回应给“如何用python简单的设计开发异步任务调度队列”

  1. 歪妖内涵网说道:

    网站不错,雁过留痕,欢迎互访!

  2. duma说道:

    你笔误了,不是hury,而是huey

发表评论