celery rabbitmq实现任务队列的异步执行

前言:

      celery这东西在任务调度方面,很有一套的,用了他也有几年了,下面就给大家介绍下我以前使用过celery的项目。

Hello ,最近总是被爬虫,标记下博客的原文地址 blog.xiaorui.cc


对于上面的场景,我曾经用tornado和gevent的方案解决,但是在我的理解范围下,感觉还是不算成熟。 tornado把任务异步后,总是影响了他的高性能。再说gevent,他是个模拟事件的东西,但是后面由于认识越来越多,有时候因为对调的其他接口不给力,导致我们一下子处理将近1000多个请求,这时候会出现莫名的bug,听说已经有patch解决了。


那么为啥要用celery ?

很简单,就是把堵塞的任务,扔到mq里面,让其他人来搞。搞的定、搞不定都回给你callback信息。 


那问题来了 ! 这又和rabbitmq又有啥关系?


和rabbitmq的关系只是在于,celery没有消息存储功能,他需要介质,比如rabbitmq redis mysql mongodb 都是可以的。有这个可控的东西,你也可以在库里面搞搞。推荐使用rabbitmq,他的速度和可用性都很高。如果你想后期方面的处理broker,那还是用redis吧。 redis的语法相对的简单点。 


Celery和RabbitMQ是两个层面的东西。
Celery是一个分布式的任务队列。它的基本工作就是管理分配任务到不同的服务器,并且取得结果。至于说服务器之间是如何进行通信的?这个Celery本身不能解决。
所以,RabbitMQ作为一个消息队列管理工具被引入到和Celery集成,负责处理服务器之间的通信任务。

现在的Celery早已增加了一些对Redis,MongoDB之类的支持。原因是RabbitMQ尽管足够强大,但对于一些相对简单的业务环境来说可能太多(复杂)了一些。这样用户可以有多一些的选择。

celery的介绍 : 

Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。

celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。

建议的消息代理RabbitMQ的,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, ,和数据库(使用SQLAlchemy的或Django的 ORM) 。

celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

安装:

pip install celery

对于消息的存储方案,可以用rabbitmq也可以用redis。 要过任务不是那么要命的话,我个人会用redis,毕竟这东西,学习成本很小的。

yum -y install rabbitmq-server

运行

rabbitmqctl  rabbitmq-env  rabbitmq-server

停止rabbitmq server的命令是

/usr/local/sbin/rabbitmqctl stop
rabbitmqctl  rabbitmq-env  rabbitmq-server

我们可以看看rabbitmq的状态:

要是喜欢用redis,那就需要安装 celery redis相应的模块 !

celery启动配置文件:

#coding:utf-8
import sys
import os
sys.path.insert(0, os.getcwd())
CELERY_IMPORTS = ("tasks", )
CELERY_RESULT_BACKEND = "amqp"
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"

一个简单的测试,这个模块里面的内容也就是需要异步起来的任务。

from celery.task import task
import time
@task()
def add(x, y):
    time.sleep(5)
    return x + y

我们来测试下 ~


大家有注意那个False吗? 为啥会出现,因为我的tasks.py里面定义了让他sleep 5秒钟,我马上要数据,肯定是没有的,等5秒过了后,我再去提取数据,就可以了。


celery的参数众多的 ~

你可以等待结果来完成,但这不是因为它的异步调用同步使用

result.get(timeout=1)

咱们可以让他返回json串

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Europe/Oslo',
    CELERY_ENABLE_UTC=True,
)

咱们再来说说redis的执行方式:

redis的方法
# tasks.py
import time
from celery import Celery
import os
celery = Celery('tasks', broker='redis://localhost:6379/0')
@celery.task
def osrun(good):
    reok=os.popen(good).read()
    return reok

启动celery

celery -A tasks worker --loglevel=info

总结:

以后有他,我再也不怕堵塞了。但是我对他的理解还是有限,后期再更新下高级方面的功能。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

电子邮件地址不会被公开。 必填项已用*标注