使用Disque做分布式消息队列支持重试和ack确认 …. ….
国际惯例,这里标记下文章的原文链接, http://xiaorui.cc/?p=1402 xiaorui.cc
xiaorui.cc
xiaorui.cc
xiaorui.cc
最近看群里有人在用Disque这个redis之父开发的一个分布式的任务消息队列。Redis现在新版已经支持一定程度的集群模式,但是还不太健全… 单一Disque结点也是只有一个结点的集群。
下面是Disque的功能的功能的详细描述… 【下面的一些描述,有些是参考各方的资料】
他是支持celery,rq这样的retry重试机制,次数自己可以自己控制。
消息需要消费者确认,很像rabbitmq那样,用户可以自己发送ack来给与服务端确认。
如果没有确认,会一直重发,一直重试到你配置的任务的过期时间。
队列是持久的,可以同步到硬盘上。
队列为了保证最大吞吐量,是乱序的。
在压力大的时候,消息不会丢弃,但会拒绝新的消息。
消费者和生产者可以通过命令查看队列中的消息。
队列尽力提供FIFO。
消息发送是事务性的,保证集群中会有所需数量的副本。
消息接收不是事务性的。
消费者默认是接收时是阻塞的,但也可以选择查看新消息。
生产者在队列满时发新消息可以得到错误信息,也可以让集群异步地复制消息。
支持延迟作业,粒度是秒,最久可以长达数年。但需要消耗内存。
消费者和生产者可以连接不同的结点。
这里介绍几个disque的几个命令….
addjob 增加一个任务到队列里面,附加的参数有很多,比如超时时间,操作的超时时间,延迟,重试的测试,TTL,大小…
ADDJOB queue_name job [ms-timeout] [REPLICATE ] [DELAY ] [RETRY ] [TTL ] [MAXLEN ] [ASYNC]
添加一个job,主要参数包括:
ms-timeout:操作超时时间,即从发起命令,到命令返回的时间,如果超过指定时间,返回超时错误,目前的实现由于包含顺序replication,网络环境出问题的话,很容易触发这个超时。
REPLICATE count:复制份数,妈蛋的,这个我也没怎么明白..
DELAY sec:延迟指定时间后,再放入到disque队列中。
RETRY sec:消息取走没有收到ack sec秒,那么重新把消息放回队列。
TTL sec:消息如果在队列中存在时间超过sec秒,就直接删除消息,无论有没有被消费, 他不同于ms-timeout。
MAXLEN count:消息队列最大长度。
ASYNC:采用异步方式操作。
主要过程描述:
1. 默认值:ttl:24小时,retry:-1,delay:0,replication:如果集群数目大于3,则为3,否则为集群数目。
2. 确认retry=0的时候,replication<1。
3. delay必须>=ttl
4. 如果没有指定retry,retry=ttl/10,如果ttl=0,retry=1.
5. 检查当前可达节点数目大于等于指定的复制数目。
6. 检查队列长度是否超出指定长度。
7. 创建job。
8. 分配ctime,用于实例内队列按照时间排序。
9. 设置delay或者retry时间用于调度。
10. 对于jobid冲突的情况,返回错误。
11. 添加消息进入队列,如果队列不存在,则自动创建。
12. 复制消息。
13. 对于异步消息,并且如果服务器目前内存容量不足,则在发送job到别的节点上之后,删除本地消息。
添加一个job后,client拿到的jobId,这个id的组成是,前两位采用关键字DI(不知道什么作用),之后8位用来标志生成这个id的实例,之后32位随机生成但和时间相关的字符串,之后4位是消息存活时间,用于在消息在指定时间内没有被消费的处理。最后加上关键字SQ。
长这个样子:DI | 0f0c644f | d3ccb51c2cedbd47fcb6f312646c993c | 05a0 | SQ
消息ID的作用,主要有以下几点:
每当生产者发送一个job,就会拿到这个id,可以用于生成者确认消息是否被正确消费。
每当消费者拿到一个job,就会附带这个id,用于消费者通知生成者或者disque消息已经被正确处理完毕。
getjob
GETJOB [TIMEOUT ] [COUNT ] FROM queue1 queue2 … queueN
从队列中拿出消息。
TIMEOUT:执行超时时间,如果超时,直接返回。
COUNT count:返回count个消息。
主要过程:
1. 默认:count:1
2. 从队列中取出消息。
3. 如果消息未命中,进入负载均衡阶段。
4. 从别的节点请求必要数量的消息。
5. 返回客户端。
返回的消息,附带队列名,消息以及jobID。
deljob
DELJOB jobid_1 jobid_2 … jobid_N
删除当前节点上指定的消息。
del只会操作当前节点,对其他节点不会操作,也不会通知。
主要过程:
1. 从回放队列中取出。
2. 删除job。
ackjob
ACKJOB jobid1 jobid2 … jobidN
用法很简单,直接加上jobid就ok。不需要别的。
这个命令用于确认消息已经被消费。
主要过程:
1. 如果不存在指定job,创建空的状态为JOB_STATE_ACKED的job。
2. 从队列中取出指定job(防止已经被回放的job被消费)。
3. 设置job状态为JOB_STATE_ACKED。
4. 从回放队列中取出。
5. 顺序发送ack消息到集群中别的所有节点,每个节点必须应答之后才算完成。
6. 应答消息的时候,回答节点必须确认处理完指定job之后才会应答。
ENQUEUE <job-id> … <job-id>
这是用来确保任务在队列里面。
fastack
FASTACK jobid_1 jobid_2 … jobid_N
用法和功能基本和ACKJOB是一样的,区别是,FASTACK不需要确认其他节点应答ACK消息就会返回,算是个异步的形态。。
disque的实现上,做法如下:在每个job里面,保存了一个属性retry时间,消息添加到队列中的时候,同时会加入到一个服务器调度队列中。每当消息从消息队列取出后,并不会马上从disque中删除,只是单纯从队列中取出,如果在指定(retry)时间没有得到消费完成的确认(ackjob)的话,消息就会被重新入队用于消费。
很明显会产生重复消费,retry时间是消费时间的一个预估,如果估计失败,可能导致消息会被反复消费,造成队列堆积。对于这个问题,redis的解决方案很简单,为job指定最长生存时间,如果达到这个时间,即使job没有被消费,也会被删除。(队列阻塞过久可能会导致消息丢失)
对于非幂等的操作,如何才能保证消费次数,就要靠下面介绍的“最多消费一次”了。
最多消费一次
相对于前一个,这一个承诺主要用于的场景是,消息不是幂等的,但可以丢消息———最多消费一次,也就是说,一次都不消费或者消费不完整也在承诺之内。
实现手段上,很简单,只要让前一个主题中提到的retry=0就可以,如果只是目测,感觉不到丢消息的气息,只会有之前提到的消息消费不完整的问题,但如果把宕机作为因素考虑,就可以明显看到,宕机会导致丢消息。
对于目前存在的不少问题,Antirez给出了一些应答,但目前没有实现。
性能问题。当前模式下,不对性能有任何承诺,需要结合具体生产环境的使用方式去优化。目前没必要和别的队列产品比较。
fastack。ack由于需要所有节点确认,是一个较大的瓶颈点,如果在实际使用中,发现用户并不关心消息的重复消费问题的话,这个实现可能有所改变。
单线程问题。redis的单线程模式对于redis这种数据结构服务可能比较适用,但是在队列的实现中,这个是没必要的,Antirez可能需要参考实际使用决策是否采用多线程实现。
落地以及消息容量问题。与redis一样,作为内存不落地消息队列,存量受限于内存。Antirez的思路是,当OOM的时候,落地消息到磁盘,等内存里面的消息消费完成之后,读取出磁盘上的消息入队。
addjob时候的广播问题。目前采用的是串行发送到目标实例,这里如果修改为并行的话,性能会好很多。
负载均衡的实现问题,当前的模型比较简单,对很多特殊负载处理不好,可能会在未来优化。
这里简单说下disque的安装教程
git clone https://github.com/antirez/disque.git cd disque/ make cd src/ make test
make test的时候会提示你的tcl版本,可以直接apt-get install tcl
[ root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$ make test
You need tcl 8.5 or newer in order to run the Disque test
make: *** [test] Error 1
执行文件在src目录下,直接跑就可以了.
[ root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$ ll|grep disque
-rwxr-xr-x 1 root root 2.2M May 10 14:14 disque*
-rwxr-xr-x 1 root root 29K May 10 14:14 disque-check-aof*
-rw-r–r– 1 root root 6.2K May 10 14:13 disque-check-aof.c
-rw-r–r– 1 root root 44K May 10 14:14 disque-check-aof.o
-rw-r–r– 1 root root 63K May 10 14:13 disque-cli.c
-rw-r–r– 1 root root 317K May 10 14:14 disque-cli.o
-rwxr-xr-x 1 root root 2.8M May 10 14:14 disque-server*
-rw-r–r– 1 root root 92K May 10 14:13 disque.c
-rw-r–r– 1 root root 38K May 10 14:13 disque.h
-rw-r–r– 1 root root 287K May 10 14:14 disque.o
-rw-r–r– 1 root root 2.3K May 10 14:13 disqueassert.h
[ root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$
root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$ make test
Starting disque #0 at port 25000
Starting disque #1 at port 25001
Starting disque #2 at port 25002
Starting disque #3 at port 25003
Starting disque #4 at port 25004
Starting disque #5 at port 25005
Starting disque #6 at port 25006
Testing unit: 00-base.tcl
14:20:13> (init) Restart killed instances: OK
14:20:13> Cluster nodes are reachable: OK
14:20:13> Cluster nodes hard reset: OK
14:20:13> Cluster Join and auto-discovery test:
/data/buzzMaster/disque/src
[ root@bj-buzz-docker01:/data/buzzMaster/disque/src {master} ]$ ./disque-server
14589:C 10 May 14:17:24.927 # Warning: no config file specified, using the default config. In order to specify a config file use ./disque-server /path/to/disque.conf
14589:P 10 May 14:17:24.928 * No cluster configuration found, I’m b1e8c8476075378ff8a97b116d977950128c4aa5
Disque 0.0.1 (a6de3813/0) 64 bit
_ –
. Port: 7711
. o . PID: 14589
.
– http://disque.io
14589:P 10 May 14:17:24.990 # Server started, Disque version 0.0.1
14589:P 10 May 14:17:24.990 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add ‘vm.overcommit_memory = 1’ to /etc/sysctl.conf and then reboot or run the command ‘sysctl vm.overcommit_memory=1’ for this to take effect.
14589:P 10 May 14:17:24.990 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
14589:P 10 May 14:17:24.990 * The server is now ready to accept connections on port 7711
另外看了下,关于disque的python包,居然有好几个….
Package Weight* Description
disque 0.11 13 Python client for disque
disque-py 0.0.0.post1 9 Python Disque client (very very alpha, as is Disque itself)
pydisque 0.1.1 6 disque client
disq 0.0.4 5 Python Disque client (very very alpha, as is Disque itself)
>> REPLICATE count:复制份数,妈蛋的,这个我也没怎么明白..这个就是复制因子,主要是做集群容错恢复用的
哈哈 ~ 好!