前言:
前两天由于某几个厂商的api出问题,导致后台任务大量堆积,又因为我这边任务流系统会重试超时任务,所以导致队列中有大量的重复任务。这时候我们要临时解决两个事情,一件事情,让一些高质量的任务优先执行; 另一件事情, 要有去重。 rabbitmq不能很好的针对这类情况去重、分优先级。
这时候我又想到了我最爱的redis… 去重? list + set 就可以解决, 优先级,zset + zrange + zrem 也可以解决… 但问题这几个命令非原子,那么怎么让他们原子? 写模块 or redis lua script . 首先在 redis 4.x 写了个简单的module,但写完了发现一件颇为重要的事情,我们线上的是3.2 …. 然后又花了点时间改成redis lua的版本。项目本身的功能实现很简单,复杂的是创意 !!!
项目名: redis_unique_queue, 项目地址,https://github.com/rfyiamcool/redis_unique_queue
该文章后续会有更新, 原文地址, http://xiaorui.cc/?p=4828
主要功能介绍:
使用redis lua script 封装的去重及优先级队列方法, 达到了组合命令的原子性和节省来往的io请求的目的.
去重队列:
不仅能保证FIFO, 而且去重.
优先级去重队列:
按照优先级获取任务, 并且去重.
使用方法:
# xiaorui.cc PriorityQueue NewPriorityQueue(priority int, unique bool, r *redis.Pool) Push(q string, body string, pri int) (int, error) Pop(q string) (resp string, err error) UniqueQueue NewUniqueQueue(r *redis.Pool) *UniqueQueue UniquePush(q string, body string) (int, error) UniquePop(q string) (resp string, err error) more..
下面是优先级去重队列的例子:
package main // xiaorui.cc import ( "fmt" "github.com/rfyiamcool/redis_unique_queue" ) func main() { fmt.Println("start") redis_client_config := unique_queue.RedisConfType{ RedisPw: "", RedisHost: "127.0.0.1:6379", RedisDb: 0, RedisMaxActive: 100, RedisMaxIdle: 100, RedisIdleTimeOut: 1000, } redis_client := unique_queue.NewRedisPool(redis_client_config) qname := "xiaorui.cc" body := "message from xiaorui.cc" u := unique_queue.NewPriorityQueue(3, true, redis_client) // 3: 3个优先级,从1-3级 // true: 开启unique set u.Push(qname, body, 2) // 2, 优先级 fmt.Println(u.Pop(qname)) }
单单使用 去重队列的例子:
package main import ( "fmt" "github.com/rfyiamcool/redis_unique_queue" ) func main() { fmt.Println("start") redis_client_config := unique_queue.RedisConfType{ RedisPw: "", RedisHost: "127.0.0.1:6379", RedisDb: 0, RedisMaxActive: 100, RedisMaxIdle: 100, RedisIdleTimeOut: 1000, } redis_client := unique_queue.NewRedisPool(redis_client_config) qname := "xiaorui.cc" u := unique_queue.NewUniqueQueue(redis_client) for i := 0; i < 100; i++ { u.UniquePush(qname, "body...") } fmt.Println(u.Length(qname)) for i := 0; i < 100; i++ { u.UniquePop(qname) } fmt.Println(u.Length(qname)) fmt.Println("end") }
需要改进地址也是很多, 比如 加入批量操作, 对于redis连接池引入方法改进等.
END.