题外,因为抢火车票的原因,颇有些年味的感觉。值得庆幸的是成功的购票。
正题,跟朋友聊起现在的消息队列,虽然现在的消息队列服务端繁多,但因为业务的原因,往往就那几个选择。 要安全和功能? rabbitmq ,但他的性能确实一般。 zeromq性能是好,但他更偏向于tcp组件。kafka适合大吞吐的消息队列。 据我发现更多的人选择redis作为消息队列。 不少人吐槽redis队列实在简单? 那你要知道redis官方也只是把LIST作为nosql的一个数据类型而已。
我们social小组对于redis很是依赖,OPS操作数经常可以爆到3w+
文章写的不是太严谨,后期会不断更新修改补充该文章,标注下原文地址:
项目名
secure_queue
项目地址
https://github.com/rfyiamcool/secure_queue
用一句话来描述这项目,secure_queue可以让redis支持ACK(消息确认), 值得一用 !
为什么会做secure_queue?
平时很喜欢使用redis做MQ消息队列, 使用他的List类型结构实现队列, 简单、高效、性能极好的优点, 但是Redis相比rabbitmq、kafka没有消息确认(ack commit)的特性. 那么我通过实现一个服务来扩展redis的消息确认功能. 现在已经封装了server服务及python client模块,对于使用者来说很是易用的.
实现原理:
服务端是通用的,但现在客户端只有python一个版本,其他语言想用的话,只需要在入队的时候兼容下格式就可以了。 服务端是通过zset有序队列实现的到期时间判断,score分支为通知时间。
secure_queue主要分两部分:
secure_queue client
每次消费队列的时候,会往该队列的ack_queue有序队列扔任务, score为任务的最长超时时间.
secure_queue.py
用来维护ack_queue, 把符合条件的任务重新扔回任务队列 !
Future:
1. redis zset的value不能重复,解决方法在value加入uuid. (现在已经实现)
2. 加入重试次数控制.
使用方法:
启动redis
redis-server
启动secure_queue服务端
start_secure_queue.py
secure_queue的客户端测试代码
#coding:utf-8 #blog: xiaorui.cc #mail: rfyiamcool@163.com import random from mq import MessageQueue addr = { "host":"127.0.0.1", "port":6379, "queue":'queue', #队列名字 } def test_timeout(): r.rpush('rfyiamcool@163.com',timeout=5) def test_only(): r.rpush('http://github.com/rfyiamcool') def test_commit(): r.rpush('xiaorui.cc',timeout=5) r.commit('xiaorui.cc') def test_performance(): for i in range(10000): r.rpush(random.randrange(1,10000),timeout=10) r = MessageQueue(**addr) if __name__ == "__main__": test_only() test_commit() test_timeout() test_performance() print 'end...'
性能测试
secure_queue主要分为入队列和扫描ackqueue两个模块,虽然是python写的,但他本身没什么性能损耗,另外对于扫描ackqueue是支持多线程扫描的。 最后简单、暴力地测试性能也是可以一秒钟处理1w+的任务量.
扯淡, 大家有时间细心看下redis的数据结构,很多时候我们往往扫一眼就不再关注。