python使用pydisque实现操作disque消息队列

关于disque的介绍,我这里就不多啰嗦了,以前写过disque的介绍,大家可以看看比较详细的介绍及安装.

http://xiaorui.cc/?p=1402

我这边的主要开发是用python …. …. disque是个比较奇特的消息队列,东西不大,但是精而全,单纯做队列,要比redis强大的。 disque安装很是简单,直接 pip install pydisque就可以了… 

关于disque的文章,原文出处是,  http://xiaorui.cc   http://xiaorui.cc/?p=1411

我们首先需要建立disque连接对象…是可以多加几个node节点的

from pydisque.client import Client
client = Client(["127.0.0.1:7711", "127.0.0.1:7712", "127.0.0.1:7713"])
client.connect()

连接创立完毕,现在我们就增加一个任务.. 

c.add_job("test_queue", json.dumps(["print", "hello", "world", time.time()]), timeout=100)

Then,下面是消费者的代码… 

while True:
    jobs = c.get_job(['test'])
    for queue_name, job_id, job in jobs:
        job = json.loads(job)
        print "received job:", job
        c.ack_job(job_id)

增加一个任务,附加的参数如下,超时,延迟,超时,重试等….

add_job(queue_name, job, timeout=200, replicate=None, delay=None, retry=None, ttl=None, maxlen=None, async=None)[source]


ADDJOB queue_name job <ms-timeout> [REPLICATE <count>] [DELAY <sec>] [RETRY <sec>] [TTL <sec>] [MAXLEN <count>] [ASYNC]

参数说明: 
        queue_name – is the name of the queue, any string, basically.
        job – is a string representing the job.
        timeout – is the command timeout in milliseconds.
        replicate – count is the number of nodes the job should be replicated to.
        delay – sec is the number of seconds that should elapse before the job is queued by any server.
        retry – sec period after which, if no ACK is received, the job is put again into the queue for delivery. If RETRY is 0, the job has an at-least-once delivery semantics.
        ttl – sec is the max job life in seconds. After this time, the job is deleted even if it was not successfully delivered.
        maxlen – count specifies that if there are already count messages queued for the specified queue name, the message is refused and an error reported to the client.
        async – asks the server to let the command return ASAP and replicate the job to other nodes in the background. The job gets queued ASAP, while normally the job is put into the queue only when the client gets a positive reply.
        Returns:    
        job_id


获取任务,get_job… … 可以一次性获取多个,可以加timeout参数
get_job(queues, timeout=None, count=None)[source]
GETJOB [TIMEOUT <ms-timeout>] [COUNT <count>] FROM queue1 queue2 … queueN

查看队列的大小
qlen(queue_name)[source]
QLEN <qname>

获取数据
qpeek(queue_name, count)[source]
QPEEK <qname> <count>

会返回任务情况和jobid

ack_job(*job_ids)[source]
返回job的状态给disque服务端。 

show(job_id)[source]

可以详细的通过job_id来显示任务信息

del_job(*job_ids)[source]
删除任务

ENQUEUE
ENQUEUE <job-id> … <job-id>
如果给定的任务尚未被放入到队列里面, 那么把它们放入到队列里面。

DEQUEUE <job-id> … <job-id>
从队列里面移除指定的任务。


这两天我会做个disque的压力测试,最主要是看看能不能承受百万的队列,然后插入取出的性能….


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注