使用Disque做分布式消息队列支持重试和ack确认 …. ….

国际惯例,这里标记下文章的原文链接,   http://xiaorui.cc/?p=1402   xiaorui.cc

xiaorui.cc

xiaorui.cc

xiaorui.cc


最近看群里有人在用Disque这个redis之父开发的一个分布式的任务消息队列。Redis现在新版已经支持一定程度的集群模式,但是还不太健全… 单一Disque结点也是只有一个结点的集群。

下面是Disque的功能的功能的详细描述…   【下面的一些描述,有些是参考各方的资料】

他是支持celery,rq这样的retry重试机制,次数自己可以自己控制。 

消息需要消费者确认,很像rabbitmq那样,用户可以自己发送ack来给与服务端确认。 
如果没有确认,会一直重发,一直重试到你配置的任务的过期时间。 
队列是持久的,可以同步到硬盘上。 
队列为了保证最大吞吐量,是乱序的。 
在压力大的时候,消息不会丢弃,但会拒绝新的消息。
消费者和生产者可以通过命令查看队列中的消息。
队列尽力提供FIFO。
消息发送是事务性的,保证集群中会有所需数量的副本。
消息接收不是事务性的。
消费者默认是接收时是阻塞的,但也可以选择查看新消息。
生产者在队列满时发新消息可以得到错误信息,也可以让集群异步地复制消息。
支持延迟作业,粒度是秒,最久可以长达数年。但需要消耗内存。
消费者和生产者可以连接不同的结点。

这里介绍几个disque的几个命令….


addjob  增加一个任务到队列里面,附加的参数有很多,比如超时时间,操作的超时时间,延迟,重试的测试,TTL,大小…

ADDJOB queue_name job [ms-timeout] [REPLICATE ] [DELAY ] [RETRY ] [TTL ] [MAXLEN ] [ASYNC] 
添加一个job,主要参数包括: 
ms-timeout:操作超时时间,即从发起命令,到命令返回的时间,如果超过指定时间,返回超时错误,目前的实现由于包含顺序replication,网络环境出问题的话,很容易触发这个超时。 
REPLICATE count:复制份数,妈蛋的,这个我也没怎么明白..
DELAY sec:延迟指定时间后,再放入到disque队列中。 
RETRY sec:消息取走没有收到ack sec秒,那么重新把消息放回队列。
TTL sec:消息如果在队列中存在时间超过sec秒,就直接删除消息,无论有没有被消费, 他不同于ms-timeout。
MAXLEN count:消息队列最大长度。 
ASYNC:采用异步方式操作。 
主要过程描述: 
1. 默认值:ttl:24小时,retry:-1,delay:0,replication:如果集群数目大于3,则为3,否则为集群数目。 
2. 确认retry=0的时候,replication<1。 
3. delay必须>=ttl 
4. 如果没有指定retry,retry=ttl/10,如果ttl=0,retry=1. 
5. 检查当前可达节点数目大于等于指定的复制数目。 
6. 检查队列长度是否超出指定长度。 
7. 创建job。 
8. 分配ctime,用于实例内队列按照时间排序。 
9. 设置delay或者retry时间用于调度。 
10. 对于jobid冲突的情况,返回错误。 
11. 添加消息进入队列,如果队列不存在,则自动创建。 
12. 复制消息。 
13. 对于异步消息,并且如果服务器目前内存容量不足,则在发送job到别的节点上之后,删除本地消息。

添加一个job后,client拿到的jobId,这个id的组成是,前两位采用关键字DI(不知道什么作用),之后8位用来标志生成这个id的实例,之后32位随机生成但和时间相关的字符串,之后4位是消息存活时间,用于在消息在指定时间内没有被消费的处理。最后加上关键字SQ。

长这个样子:DI | 0f0c644f | d3ccb51c2cedbd47fcb6f312646c993c | 05a0 | SQ

消息ID的作用,主要有以下几点:

每当生产者发送一个job,就会拿到这个id,可以用于生成者确认消息是否被正确消费。

每当消费者拿到一个job,就会附带这个id,用于消费者通知生成者或者disque消息已经被正确处理完毕。

getjob

GETJOB [TIMEOUT ] [COUNT ] FROM queue1 queue2 … queueN 
从队列中拿出消息。

TIMEOUT:执行超时时间,如果超时,直接返回。 
COUNT count:返回count个消息。

主要过程: 
1. 默认:count:1 
2. 从队列中取出消息。 
3. 如果消息未命中,进入负载均衡阶段。 
4. 从别的节点请求必要数量的消息。 
5. 返回客户端。

返回的消息,附带队列名,消息以及jobID。

deljob

DELJOB jobid_1 jobid_2 … jobid_N 
删除当前节点上指定的消息。

del只会操作当前节点,对其他节点不会操作,也不会通知。

主要过程: 
1. 从回放队列中取出。 
2. 删除job。

ackjob

ACKJOB jobid1 jobid2 … jobidN 
用法很简单,直接加上jobid就ok。不需要别的。 
这个命令用于确认消息已经被消费。

主要过程: 
1. 如果不存在指定job,创建空的状态为JOB_STATE_ACKED的job。 
2. 从队列中取出指定job(防止已经被回放的job被消费)。 
3. 设置job状态为JOB_STATE_ACKED。 
4. 从回放队列中取出。 
5. 顺序发送ack消息到集群中别的所有节点,每个节点必须应答之后才算完成。 
6. 应答消息的时候,回答节点必须确认处理完指定job之后才会应答。

ENQUEUE <job-id> … <job-id>

这是用来确保任务在队列里面。

fastack

FASTACK jobid_1 jobid_2 … jobid_N 
用法和功能基本和ACKJOB是一样的,区别是,FASTACK不需要确认其他节点应答ACK消息就会返回,算是个异步的形态。。


disque的实现上,做法如下:在每个job里面,保存了一个属性retry时间,消息添加到队列中的时候,同时会加入到一个服务器调度队列中。每当消息从消息队列取出后,并不会马上从disque中删除,只是单纯从队列中取出,如果在指定(retry)时间没有得到消费完成的确认(ackjob)的话,消息就会被重新入队用于消费。

很明显会产生重复消费,retry时间是消费时间的一个预估,如果估计失败,可能导致消息会被反复消费,造成队列堆积。对于这个问题,redis的解决方案很简单,为job指定最长生存时间,如果达到这个时间,即使job没有被消费,也会被删除。(队列阻塞过久可能会导致消息丢失)

对于非幂等的操作,如何才能保证消费次数,就要靠下面介绍的“最多消费一次”了。

最多消费一次

相对于前一个,这一个承诺主要用于的场景是,消息不是幂等的,但可以丢消息———最多消费一次,也就是说,一次都不消费或者消费不完整也在承诺之内。

实现手段上,很简单,只要让前一个主题中提到的retry=0就可以,如果只是目测,感觉不到丢消息的气息,只会有之前提到的消息消费不完整的问题,但如果把宕机作为因素考虑,就可以明显看到,宕机会导致丢消息。


对于目前存在的不少问题,Antirez给出了一些应答,但目前没有实现。

性能问题。当前模式下,不对性能有任何承诺,需要结合具体生产环境的使用方式去优化。目前没必要和别的队列产品比较。

fastack。ack由于需要所有节点确认,是一个较大的瓶颈点,如果在实际使用中,发现用户并不关心消息的重复消费问题的话,这个实现可能有所改变。

单线程问题。redis的单线程模式对于redis这种数据结构服务可能比较适用,但是在队列的实现中,这个是没必要的,Antirez可能需要参考实际使用决策是否采用多线程实现。

落地以及消息容量问题。与redis一样,作为内存不落地消息队列,存量受限于内存。Antirez的思路是,当OOM的时候,落地消息到磁盘,等内存里面的消息消费完成之后,读取出磁盘上的消息入队。

addjob时候的广播问题。目前采用的是串行发送到目标实例,这里如果修改为并行的话,性能会好很多。

负载均衡的实现问题,当前的模型比较简单,对很多特殊负载处理不好,可能会在未来优化。

这里简单说下disque的安装教程

make test的时候会提示你的tcl版本,可以直接apt-get install tcl
[ root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$ make test
You need tcl 8.5 or newer in order to run the Disque test
make: *** [test] Error 1

执行文件在src目录下,直接跑就可以了. 

[ root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$ ll|grep disque
-rwxr-xr-x 1 root root 2.2M May 10 14:14 disque*
-rwxr-xr-x 1 root root  29K May 10 14:14 disque-check-aof*
-rw-r–r– 1 root root 6.2K May 10 14:13 disque-check-aof.c
-rw-r–r– 1 root root  44K May 10 14:14 disque-check-aof.o
-rw-r–r– 1 root root  63K May 10 14:13 disque-cli.c
-rw-r–r– 1 root root 317K May 10 14:14 disque-cli.o
-rwxr-xr-x 1 root root 2.8M May 10 14:14 disque-server*
-rw-r–r– 1 root root  92K May 10 14:13 disque.c
-rw-r–r– 1 root root  38K May 10 14:13 disque.h
-rw-r–r– 1 root root 287K May 10 14:14 disque.o
-rw-r–r– 1 root root 2.3K May 10 14:13 disqueassert.h
[ root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$

 root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$ make test
Starting disque #0 at port 25000
Starting disque #1 at port 25001
Starting disque #2 at port 25002
Starting disque #3 at port 25003
Starting disque #4 at port 25004
Starting disque #5 at port 25005
Starting disque #6 at port 25006
Testing unit: 00-base.tcl
14:20:13> (init) Restart killed instances: OK
14:20:13> Cluster nodes are reachable: OK
14:20:13> Cluster nodes hard reset: OK
14:20:13> Cluster Join and auto-discovery test:

/data/buzzMaster/disque/src
[ root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$ ./disque-server
14589:C 10 May 14:17:24.927 # Warning: no config file specified, using the default config. In order to specify a config file use ./disque-server /path/to/disque.conf
14589:P 10 May 14:17:24.928 * No cluster configuration found, I’m b1e8c8476075378ff8a97b116d977950128c4aa5
                                        Disque 0.0.1 (a6de3813/0) 64 bit
          _ -
        .                               Port: 7711
        .    o    .                     PID: 14589
                 .
               -                              http://disque.io

14589:P 10 May 14:17:24.990 # Server started, Disque version 0.0.1
14589:P 10 May 14:17:24.990 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add ‘vm.overcommit_memory = 1′ to /etc/sysctl.conf and then reboot or run the command ‘sysctl vm.overcommit_memory=1′ for this to take effect.
14589:P 10 May 14:17:24.990 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
14589:P 10 May 14:17:24.990 * The server is now ready to accept connections on port 7711

另外看了下,关于disque的python包,居然有好几个….



Package Weight* Description
disque 0.11 13  Python client for disque
disque-py 0.0.0.post1   9   Python Disque client (very very alpha, as is Disque itself)
pydisque 0.1.1  6   disque client
disq 0.0.4  5   Python Disque client (very very alpha, as is Disque itself)



对Python及运维开发感兴趣的朋友可以加QQ群 : 478476595 !!!
{ 2000人qq大群内有各厂大牛,常组织线上分享及沙龙,对高性能及分布式场景感兴趣同学欢迎加入该QQ群 }

另外如果大家觉得文章对你有些作用!   帮忙点击广告. 一来能刺激我写博客的欲望,二来好维护云主机的费用.
如果想赏钱,可以用微信扫描下面的二维码. 另外再次标注博客原地址  xiaorui.cc  ……   感谢!
暂无相关产品

2则回应给“使用Disque做分布式消息队列支持重试和ack确认”

  1. 虚风竹叶说道:

    >> REPLICATE count:复制份数,妈蛋的,这个我也没怎么明白..这个就是复制因子,主要是做集群容错恢复用的

发表评论