兼容go redis cluster的pipeline批量

前言:

     redis cluster集群机制是不错,但因为是smart client设计,没有proxy中间层,导致很多redis批量命令在不同slot时不能适配,比如 mset、mget、pipeline等。 该篇文章讲述了redis cluster multi key批量操作的一些解决方案,尤其是golang的场景下。  

    该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5557

老生常谈的redis cluster概念

    Redis Cluster在设计中没有使用一致性哈希(Consistency Hashing),而是使用数据分片(Sharding)引入哈希槽(hash slot)来实现。一个 Redis Cluster包含16384(0~16383)个哈希槽,存储在Redis Cluster中的所有键都会被映射到这些slot中,集群中的每个键都属于这16384个哈希槽中的一个,集群使用公式slot=CRC16 key/16384来计算key属于哪个槽。

    集群中的每个主节点(Master)都负责处理16384个哈希槽中的一部分,当集群处于稳定状态时,每个哈希槽都只由一个主节点进行处理,每个主节点可以有一个到N个从节点。当主节点出现宕机或网络断线等不可用时,从节点能自动提升为主节点进行处理。


问题

不同slot区间的key可能在不同的节点上,如果在一个节点上执行不属于他slot区间的key会发生什么? 下面是可以会出现的error信息, redis cluster返回的错误很多,但核心异常信息不属于一个slot。

// xiaorui.cc

all keys must map to the same key slot
ERR CROSSSLOT Keys in request don't hash to the same slot

bad lua script for redis cluster, all the keys that the script uses should be passed using the KEYS array
eval/evalsha command keys must in same slot

redis cluster是个smart client设计模式,客户端会存有redis cluster的分片和节点关系的缓存。当你使用multi key的redis命令时 (mset、mget ),client通常会选择参数的第一个key的slot的主机进行发送。如果参数的多个key在不同的slot,在不同的主机上,那么必然会出错。 

解决方法

第一种:
最简单的方法是在所有的执行key前面加入hashtag,让他的key都在一个slot上。这样你的程序和架构简单了,带来的问题是数据倾斜,后面可能内存受限、qps受限。
第二种:

使用proxy方案做适配redis cluster集群, 像codis。滴滴、12306都在大量使用该代理。

第三种:
在客户端上实现命令的slot分组,然后分别并发处理。

第一种和第二种方法没什么说的,具体说下客户端怎么搞?先前有说过,客户端会缓存slot及node主机的关系,我们在客户端上根据参数keys做slot分离不就行了。像python的redis-py库包已帮你做了redis cluster的适配,对于上层代码无感知。golang其实也有这样的库,这里可以参考下 github.com/chasex/redis-go-cluster 代码, 这个是基于gomodule/redigo封装的redis集群客户端库。

redis-go-cluster处理入口, 劫持了MSET、MSETNX、MGET批量命令,根据slot分到不同的slice里,然后针对多个slice进行并发请求。

// xiaorui.cc

func (cluster *Cluster) Do(cmd string, args ...interface{}) (interface{}, error) {
    if len(args) < 1 {
    return nil, fmt.Errorf("Do: no key found in args")
    }

    if cmd == "MSET" || cmd == "MSETNX" {
    return cluster.multiSet(cmd, args...)
    }

    if cmd == "MGET" {
    return cluster.multiGet(cmd, args...)
    }
    ...
}


func (cluster *Cluster) multiSet(cmd string, args ...interface{}) (interface{}, error) {
    ...
    // 命令分组
    tasks := make([]*multiTask, 0)

    cluster.rwLock.RLock()
    for i := 0; i < len(args); i += 2 {
        key, err := key(args[i])
        if err != nil {
            cluster.rwLock.RUnlock()
            return nil, fmt.Errorf("multiSet: invalid key %v", args[i])
        }

        slot := hash(key)

        var j int
        for j = 0; j < len(tasks); j++ {
            // 相同的slot的key放在一个multiTask的slice里。
            if tasks[j].slot == slot {
                tasks[j].args = append(tasks[j].args, args[i])   // key
                tasks[j].args = append(tasks[j].args, args[i+1]) // value

                break
            }
        }

        if j == len(tasks) {
            node := cluster.slots[slot]
            if node == nil {
                cluster.rwLock.RUnlock()
                return nil, fmt.Errorf("multiSet: %s[%d] no node found", key, slot)
            }

            task := &multiTask{
                node: node,
                slot: slot,
                cmd:  cmd,
                args: []interface{}{args[i], args[i+1]},
                done: make(chan int),
            }
            tasks = append(tasks, task)
        }
    }
    cluster.rwLock.RUnlock()

    // 每个slot队列为为一个并发
    for i := range tasks {
        go handleSetTask(tasks[i])
    }

    // 确定都执行完
    for i := range tasks {
        <-tasks[i].done
    }

    for i := range tasks {
        _, err := String(tasks[i].reply, tasks[i].err)
        if err != nil {
            return nil, err
        }
    }

    return "OK", nil
}

type multiTask struct {
    node *redisNode
    slot uint16

    cmd  string
    args []interface{}

    reply   interface{}
    replies []interface{}
    err     error

    done chan int
}

下面是redis cluster pipeline批量实现, 跟mset mget的实现差不都,都是通过算出key的slot放在不同的slice里面,继而并发的使用pipeline。

// xiaorui.cc

// NewBatch create a new batch to pack mutiple commands.
func (cluster *Cluster) NewBatch() *Batch {
    return &Batch{
        cluster: cluster,
        batches: make([]nodeBatch, 0),
        index:   make([]int, 0),
    }
}

// Put add a redis command to batch, DO NOT put MGET/MSET/MSETNX.
func (batch *Batch) Put(cmd string, args ...interface{}) error {
    if len(args) < 1 {
        return fmt.Errorf("Put: no key found in args")
    }

    if cmd == "MGET" || cmd == "MSET" || cmd == "MSETNX" {
        return fmt.Errorf("Put: %s not supported", cmd)
    }

    node, err := batch.cluster.getNodeByKey(args[0])
    if err != nil {
        return fmt.Errorf("Put: %v", err)
    }

    var i int
    for i = 0; i < len(batch.batches); i++ {
        if batch.batches[i].node == node {
            batch.batches[i].cmds = append(batch.batches[i].cmds,
                nodeCommand{cmd: cmd, args: args})

            batch.index = append(batch.index, i)
            break
        }
    }
    ...
}

func (cluster *Cluster) RunBatch(bat *Batch) ([]interface{}, error) {
    for i := range bat.batches {
        go doBatch(&bat.batches[i])
    }

    for i := range bat.batches {
        <-bat.batches[i].done
    }

    var replies []interface{}
    for _, i := range bat.index {
        if bat.batches[i].err != nil {
            return nil, bat.batches[i].err
        }

        replies = append(replies, bat.batches[i].cmds[0].reply)
        bat.batches[i].cmds = bat.batches[i].cmds[1:]
    }

    return replies, nil
}

func doBatch(batch *nodeBatch) {
    conn, err := batch.node.getConn()
    ...

    for i := range batch.cmds {
        conn.send(batch.cmds[i].cmd, batch.cmds[i].args...)
    }

    err = conn.flush()
    ...

    for i := range batch.cmds {
        reply, err := conn.receive()
        if err != nil {
            batch.err = err
            conn.shutdown()
            batch.done <- 1
            return
        }

        batch.cmds[i].reply, batch.cmds[i].err = reply, err
    }

    ...
}

go-redis这个库也实现了批量key的匹配,下面是go-redis pipieline的源码。 https://github.com/go-redis

// xiaorui.cc

func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
        ...
	cmdsMap := c.mapCmdsBySlot(cmds)
	for slot, cmds := range cmdsMap {
		node, err := state.slotMasterNode(slot)
		if err != nil {
			setCmdsErr(cmds, err)
			continue
		}
		cmdsMap := map[*clusterNode][]Cmder{node: cmds}

		for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
			if attempt > 0 {
				time.Sleep(c.retryBackoff(attempt))
			}

			failedCmds := newCmdsMap()
			var wg sync.WaitGroup

			for node, cmds := range cmdsMap {
				wg.Add(1)
				go func(node *clusterNode, cmds []Cmder) {
					defer wg.Done()

					cn, err := node.Client.getConn()
					if err != nil {
						if err == pool.ErrClosed {
							c.mapCmdsByNode(cmds, failedCmds)
						} else {
							setCmdsErr(cmds, err)
						}
						return
					}

					err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
					node.Client.releaseConnStrict(cn, err)
				}(node, cmds)
			}

			wg.Wait()
			if len(failedCmds.m) == 0 {
				break
			}
			cmdsMap = failedCmds.m
		}
	}

	return cmdsFirstErr(cmds)
}


总结:

     推荐大家在使用redis cluster时也使用批量模式,这样对于减少网络IO延迟很有效果。pipeline还可以减少syscall消耗,毕竟数据合并了。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc