golang redis pipeline管道引发乱序串读

golang redis pipeline管道引发乱序串读

前言

大早通过监控系统发现有些 api 接口时延抖动,比以前时延都要高,再通过日志得知由于上层业务的变动,导致这边会往 redis 请求近百条命令。由于其他业务也通过 redis 来访问共享数据,故而 redis 的数据结构暂无法做变更。业务那边的逻辑暂时也无法优化,就只能先硬抗了。

一句话,历史遗留问题

如何优化这个时延问题,可以临时通过 redis pipeline 批量管道来优化时延。

但在使用管道时带来了一个问题,看下文。

问题

先说下发生了什么问题?

在使用 golang redigo pipeline 模式下,错误使用会引发乱序串读的问题。简单说,发了一组 pipeline命 令,但由于只发送而没有去解析接收结果,那么后面通过连接池重用该连接时,会拿到了上次的请求结果,乱序串读了。

redigo 的这个问题我是知道的,只是时间长忘记了,毕竟好久就改用 go-redis。文章中描述的问题只有 redigo 里存在,为什么不采用 go-redis ? 别问,问就是一坨历史代码。项目中跟redis 交互的函数就有 350 多个,坨坨的逻辑轻易没人碰。😅

下面是可以复现问题的代码,注意看注释。

func TestMultiPipeline(t *testing.T) {
    c, err := redis.Dial("tcp", "172.16.0.46:6379")
    assert.Equal(&testing.T{}, err, nil)
    defer c.Close()

    s := time.Now()
    for i := 0; i < 100; i++ {
        c.Send("set", "k1", "k1")
        c.Send("get", "k1")
        c.Send("set", "k2", "k2")
        c.Send("set", "k3", "k3")
        c.Flush()
        c.Receive()
        c.Receive()
        c.Receive()
        c.Receive()
        assert.Equal(&testing.T{}, err, nil)
    }
    cost := time.Since(s)
    t.Log("pipe totol cost: ", cost)

    c.Send("get", "k1")
    c.Send("get", "k2")
    c.Flush()

    c.Send("get", "k2")
    c.Flush()
    k1, err := redis.String(c.Receive())
    assert.Equal(t, err, nil)
    assert.Equal(t, "k1", k1)  // 拿到的是上一波的返回数据

    c.Send("get", "k3")
    c.Flush()
    k2, err := redis.String(c.Receive())
    assert.Equal(t, err, nil)
    assert.Equal(t, "k2", k2)  // 拿到的是上上一个波的返回数据
}

go redigo pipeline 代码解析

看源码可得知,Flush() 只是把buffer缓冲区的数据写到连接里,而没有从连接读取的过程。所以说,在redigo的pipeline里,有几次的写,就应该有几次的 Receive() 。Receive是从连接读缓冲区里读取解析数据。

receive() 是不可或缺的! 不能多,也不能少,每个 send() 都对应一个 receive()。

如果多了,那么就会阻塞,为啥会阻塞 ?他会尝试从 conn 里读取返回数据,但问题已经没有数据可以等待返回了。

如果少了,那么就会造成文章中的 case,串写问题。

话说,文章中提及的问题本不应该要我们去处理,毕竟要上层业务放来小心的控制 receive() 着实太恶心了,更应该督促 redigo 作者去增强完善的功能,然而没用,作者现在已不想改这个库了,毕竟他的竞品 go-redis 更优秀,对外方法都实现了命令抽象,自带集群协议,命令封装在连接池内等等。

func (c *conn) Send(cmd string, args ...interface{}) error {
    c.mu.Lock()
    c.pending += 1
    c.mu.Unlock()
    if c.writeTimeout != 0 {
        c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
    }
    if err := c.writeCommand(cmd, args); err != nil {
        return c.fatal(err)
    }
    return nil
}

func (c *conn) Flush() error {
    if c.writeTimeout != 0 {
        c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
    }
    if err := c.bw.Flush(); err != nil {
        return c.fatal(err)
    }
    return nil
}

func (c *conn) Receive() (interface{}, error) {
    return c.ReceiveWithTimeout(c.readTimeout)
}

func (c *conn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
    var deadline time.Time
    if timeout != 0 {
        deadline = time.Now().Add(timeout)
    }
    c.conn.SetReadDeadline(deadline)

    if reply, err = c.readReply(); err != nil {
        return nil, c.fatal(err)
    }
    // When using pub/sub, the number of receives can be greater than the
    // number of sends. To enable normal use of the connection after
    // unsubscribing from all channels, we do not decrement pending to a
    // negative value.
    //
    // The pending field is decremented after the reply is read to handle the
    // case where Receive is called before Send.
    c.mu.Lock()
    if c.pending > 0 {
        c.pending -= 1
    }
    c.mu.Unlock()
    if err, ok := reply.(Error); ok {
        return nil, err
    }
    return
}

解决

上面其实已经说了答案,要么按部就班的使用 receive() ,不多不少的调用;要么直接使用完美的 go-redis 库。

总结

使用了 pipeline 批量确实有效的减少了时延,也减少了 redis 压力。不要再去使用 golang redigo 这个库了,请直接选择 go-redis 库。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc