golang redis pipeline管道引发乱序串读

前言

大早通过监控系统发现有些 api 接口时延抖动,比以前时延都要高,再通过日志得知由于上层业务的变动,导致这边会往 redis 请求近百条命令。由于其他业务也通过 redis 来访问共享数据,故而 redis 的数据结构暂无法做变更。业务那边的逻辑暂时也无法优化,就只能先硬抗了。

一句话,历史遗留问题

如何优化这个时延问题,可以临时通过 redis pipeline 批量管道来优化时延。

但在使用管道时带来了一个问题,看下文。

问题

先说下发生了什么问题?

在使用 golang redigo pipeline 模式下,错误使用会引发乱序串读的问题。简单说,发了一组 pipeline命 令,但由于只发送而没有去解析接收结果,那么后面通过连接池重用该连接时,会拿到了上次的请求结果,乱序串读了。

redigo 的这个问题我是知道的,只是时间长忘记了,毕竟好久就改用 go-redis。文章中描述的问题只有 redigo 里存在,为什么不采用 go-redis ? 别问,问就是一坨历史代码。项目中跟redis 交互的函数就有 350 多个,坨坨的逻辑轻易没人碰。😅

下面是可以复现问题的代码,注意看注释。

func TestMultiPipeline(t *testing.T) {
    c, err := redis.Dial("tcp", "172.16.0.46:6379")
    assert.Equal(&testing.T{}, err, nil)
    defer c.Close()

    s := time.Now()
    for i := 0; i < 100; i++ {
        c.Send("set", "k1", "k1")
        c.Send("get", "k1")
        c.Send("set", "k2", "k2")
        c.Send("set", "k3", "k3")
        c.Flush()
        c.Receive()
        c.Receive()
        c.Receive()
        c.Receive()
        assert.Equal(&testing.T{}, err, nil)
    }
    cost := time.Since(s)
    t.Log("pipe totol cost: ", cost)

    c.Send("get", "k1")
    c.Send("get", "k2")
    c.Flush()

    c.Send("get", "k2")
    c.Flush()
    k1, err := redis.String(c.Receive())
    assert.Equal(t, err, nil)
    assert.Equal(t, "k1", k1)  // 拿到的是上一波的返回数据

    c.Send("get", "k3")
    c.Flush()
    k2, err := redis.String(c.Receive())
    assert.Equal(t, err, nil)
    assert.Equal(t, "k2", k2)  // 拿到的是上上一个波的返回数据
}

go redigo pipeline 代码解析

看源码可得知,Flush() 只是把buffer缓冲区的数据写到连接里,而没有从连接读取的过程。所以说,在redigo的pipeline里,有几次的写,就应该有几次的 Receive() 。Receive是从连接读缓冲区里读取解析数据。

receive() 是不可或缺的! 不能多,也不能少,每个 send() 都对应一个 receive()。

如果多了,那么就会阻塞,为啥会阻塞 ?他会尝试从 conn 里读取返回数据,但问题已经没有数据可以等待返回了。

如果少了,那么就会造成文章中的 case,串写问题。

话说,文章中提及的问题本不应该要我们去处理,毕竟要上层业务放来小心的控制 receive() 着实太恶心了,更应该督促 redigo 作者去增强完善的功能,然而没用,作者现在已不想改这个库了,毕竟他的竞品 go-redis 更优秀,对外方法都实现了命令抽象,自带集群协议,命令封装在连接池内等等。

func (c *conn) Send(cmd string, args ...interface{}) error {
    c.mu.Lock()
    c.pending += 1
    c.mu.Unlock()
    if c.writeTimeout != 0 {
        c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
    }
    if err := c.writeCommand(cmd, args); err != nil {
        return c.fatal(err)
    }
    return nil
}

func (c *conn) Flush() error {
    if c.writeTimeout != 0 {
        c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
    }
    if err := c.bw.Flush(); err != nil {
        return c.fatal(err)
    }
    return nil
}

func (c *conn) Receive() (interface{}, error) {
    return c.ReceiveWithTimeout(c.readTimeout)
}

func (c *conn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
    var deadline time.Time
    if timeout != 0 {
        deadline = time.Now().Add(timeout)
    }
    c.conn.SetReadDeadline(deadline)

    if reply, err = c.readReply(); err != nil {
        return nil, c.fatal(err)
    }
    // When using pub/sub, the number of receives can be greater than the
    // number of sends. To enable normal use of the connection after
    // unsubscribing from all channels, we do not decrement pending to a
    // negative value.
    //
    // The pending field is decremented after the reply is read to handle the
    // case where Receive is called before Send.
    c.mu.Lock()
    if c.pending > 0 {
        c.pending -= 1
    }
    c.mu.Unlock()
    if err, ok := reply.(Error); ok {
        return nil, err
    }
    return
}

解决

上面其实已经说了答案,要么按部就班的使用 receive() ,不多不少的调用;要么直接使用完美的 go-redis 库。

总结

使用了 pipeline 批量确实有效的减少了时延,也减少了 redis 压力。不要再去使用 golang redigo 这个库了,请直接选择 go-redis 库。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc