前言:
有朋友问我,我那个任务队列是怎么实现,他的疑问其实主要是celery不支持多线程。先说说我那实现的方法,其实我的做法和celery、rq这样的框架很像的,都是把任务push到队列里面,然后pull取出任务而已,celery里面还可以取任务,我这个是通过传送uuid来实现的。 朋友问celery不支持多线程,那是他没有好好看文档。celery是支持多任务并发的,哎。。。 好好看文档呀。
原文:http://rfyiamcool.blog.51cto.com/1030776/1530826
队列存储brokers用的是rabbitmq,后面测试下用mongodb搞搞。我这里做个测试:
下面是tasks.py文件,也就是celery能支持异步的函数。
@app.task def add(x, y): wlog() sleep(10) return x + y
后端启动含有100个线程的线程池。
celery -A tasks worker -c 100 --loglevel=info
在ipython测试的结果:
看看我自己输出的日志,结果很明显,是并发的:
原文:http://rfyiamcool.blog.51cto.com/1030776/1530826
celery是支持好几个并发模式的,有prefork,threading,协程(gevent,eventlet)
prefork在celery的介绍是,用了multiprocess来实现的。多线程就补多少了,估计大家都懂。
说说协程,进程 线程经常玩,也算熟悉,话说协程算是一种轻量级进程,但又不能叫进程,因为操作系统并不知道它的存在。什么意思呢,就是说,协程像是一种在程序级别来模拟系统级别的进程,由于是单进程,并且少了上下文切换,于是相对来说系统消耗很少,而且网上的各种测试也表明,协程确实拥有惊人的速度。并且在实现过程中,协程可以用以前同步思路的写法,而运行起来确是异步的,也确实很有意思。话说有一种说法就是说进化历程是多进程->多线程->异步->协程,当然协程也有弊端,但是如果你的任务类型不是那种cpu密集的,那选用协程是个好选择。
但是需要说明的是,虽然celery官网提示说,只要在启动worker的时候,指明下类型就行了,但是如果你逻辑里面的模块有些不支持协程 gevent或者是eventlet异步的话,他还是会堵塞的。
gevent1.x之后虽然是支持subprocess的用法,gevent这个模块给非堵塞了,和他有同样功能的os.popen(‘sleep 10’).read() 是会堵塞的,据说gevent官方不支持popen的协程的用法。
看了下celery 针对gevent方面的调用,他其实就是引入了gevent的patch 。 那这样会造成堵塞的问题,如果gevent不支持这些模块,那。。。。
from gevent import monkey monkey.patch_all()
反之threading的用法倒是简单明了,支持把任务放在线程pool里面来处理。
话说回来,我的title为什么说gevent来提高性能。我和小伙伴做一些gevent支持的模块写函数,做多任务处理的时候,性能确实要比threading要高,还要稳定。
小计:
清理celery产生的数据
#清空 celery purge #清空 from celery.task.control import discard_all discard_all()
查看celery rabbitmq队列信息
rabbitmqctl list_queues
没了 !
你好,我有一个问题请假一下,在任务处理后返回数据这点,能不能自己添加一个回调处理数据并插入到数据库?
完全可以,虽然celery提供了分布式获取的task信息的接口,但是我个人还是喜欢直接写到业务的库里面