解决golang redis连接池的io异常BUG?

前言 

     golang redis连接的一个bug?为什么是带个疑问问号?  因为我也不确定这是否算一个bug,但不管是go呀,lua呀,python呀,绝大数的连接池的构建不仅仅会用来复用连接,而且会针对异常io的连接进行重试。 该文章后续仍在不断的更新修改中, 请移步到原文地址  http://xiaorui.cc/?p=5513


     golang的redis库出名的就两个,一个是gomodule的redigo,另一个是go-redis的redis库。这里说有连接池使用问题的redis库是gomodule,而go-redis没有这个问题。

     简单说,当你获取了从连接吃获取了一个连接,但是这时候连接中断了,你再去使用该连接肯定是有问题的了。go-redis会根据error的信息做连接的重试,而gomodule的redis就不管不问了。曾经在社区问过,给的回复是小概率事件,如果想做重试需要在调用方做判断。

redis pool源码分析

要分析问题,当然要看redis pool的源码了。 我们看下get方法,  当连接池的IdleTimeout 大于 0,会触发一次空闲连接的整理,这里的空闲连接整理也是被动的,当你触发get()的时候,才会去触发一次。每次触发会轮询所有的client对象。 当你的idle.front链表不为空,那么尝试去拿一个连接,如果绑定了TestOnBorrow自定义方法,那么进行检测连接是否可用。 

后面我们会具体说明下,TestOnBorrow 其实并不能完全解决io异常问题。

// xiaorui.cc

func (p *Pool) get(ctx interface {
    Done() <-chan struct{}
    Err() error
}) (*poolConn, error) {

    ...

    p.mu.Lock()

    // Prune stale connections at the back of the idle list.
    if p.IdleTimeout > 0 {
        n := p.idle.count
        for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
            pc := p.idle.back
            p.idle.popBack()
            p.mu.Unlock()
            pc.c.Close()
            p.mu.Lock()
            p.active--
        }
    }

    // Get idle connection from the front of idle list.
    for p.idle.front != nil {
        pc := p.idle.front
        p.idle.popFront()
        p.mu.Unlock()
        if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
            (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
            return pc, nil
        }
        pc.c.Close()
        p.mu.Lock()
        p.active--
    }

    // Check for pool closed before dialing a new connection.
    if p.closed {
        p.mu.Unlock()
        return nil, errors.New("redigo: get on closed pool")
    }

    // Handle limit for p.Wait == false.
    if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
        p.mu.Unlock()
        return nil, ErrPoolExhausted
    }
    ...
    return &poolConn{c: c, created: nowFunc()}, err
}

再来看下redis pool释放连接到连接池的put方法, 没什么好说的,就是把连接对象返回给连接池,更改active计数就完了。

// xiaorui.cc

func (p *Pool) put(pc *poolConn, forceClose bool) error {
	p.mu.Lock()
	if !p.closed && !forceClose {
		pc.t = nowFunc()
		p.idle.pushFront(pc)
		if p.idle.count > p.MaxIdle {
			pc = p.idle.back
			p.idle.popBack()
		} else {
			pc = nil
		}
	}

	if pc != nil {
		p.mu.Unlock()
		pc.c.Close()
		p.mu.Lock()
		p.active--
	}

        ...
	p.mu.Unlock()
	return nil
}

TestOnBorrow是我们创建redis连接池的时候注册的回调方法。当我们每次从连接池获取连接的时候,都会调用这个方法一次。

你可以这么用,每次都用ping pong来探测连接的可用,但每个操作都占用RTT,加大业务的延迟消耗,虽然内网下redis单次操作在100us左右。

// xiaorui.cc

    TestOnBorrow: func(c redis.Conn, t time.Time) error {
        _, err := c.Do("PING")
        if nil != err {
            WxReport("redis ping error:"+err.Error(), "error")
        }
        return err
    },

回调TestOnBorrow的时候,会传递给你连接对象和上次的时间,你可以一分钟检验一次。

// xiaorui.cc

    TestOnBorrow: func(c redis.Conn, t time.Time) error {
        if time.Since(t) < time.Minute {
            return nil
        }
        _, err := c.Do("PING")
        if nil != err {
            WxReport("redis ping error:"+err.Error(), "error")
        }
        return err
    },

我们上面有说过 TestOnBorrow 不能完全解决连接io异常的问题? 我们设想一下,当我pop一个连接的时候,TestOnBorrow帮我测试连接是可用的,但是探测完了后,连接中断了,这时候我去使用自然就异常了。 

大家会觉得这个概率不大,但我们遇到了几次,简单说下有两个场景。

第一个:

我们配置了超时是3600s,redis server空闲连接超时配置了600s,redis server会在我们之前把连接给关了,该redis client自然就没法用了,使用redis操作的业务逻辑自然就走不下去了,难道让我的代码都写判断,再来一次连接获取?

第二个:

同事的一个bug,  已经从pool里获取了连接,但是业务逻辑特别的繁杂,可能在一分钟后才会使用。但用之前redis做了重启呀,升级呀,又引起redis client异常了。

简单说,哪怕TestOnBorrow是每次都ping pong检查,也是有概率出现io引起的异常。现在绝大数的连接池基本都规避了该问题。

解决方法

就是判断网络io异常引起的redis对象,然后重新new一个连接就可以了。

// xiaorui.cc

package main

import (
    "errors"
    "fmt"
    "io"
    "strings"
    "time"

    "github.com/gomodule/redigo/redis"
)

var (
    RedisClient *redis.Pool
)

func init() {
    var (
        host string
        auth string
        db   int
    )
    host = "127.0.0.1:6379"
    auth = ""
    db = 0
    RedisClient = &redis.Pool{
        MaxIdle:     100,
        MaxActive:   4000,
        IdleTimeout: 180 * time.Second,
        Dial: func() (redis.Conn, error) {
            c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
            if nil != err {
                return nil, err
            }
            return c, nil
        },
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            if time.Since(t) < time.Minute {
                return nil
            }
            _, err := c.Do("PING")
            return err
        },
    }

}

func main() {
    rd := RedisClient.Get()
    defer rd.Close()

    fmt.Println("please kill redis server")
    time.Sleep(5 * time.Second)

    fmt.Println("please start redis server")
    time.Sleep(5 * time.Second)

	resp, err := redis.String(redo("SET", "push_primay", "locked"))
    fmt.Println(resp, err)
}

func IsConnError(err error) bool {
    var needNewConn bool

    if err == nil {
        return false
    }

    if err == io.EOF {
        needNewConn = true
    }
    if strings.Contains(err.Error(), "use of closed network connection") {
        needNewConn = true
    }
    if strings.Contains(err.Error(), "connect: connection refused") {
        needNewConn = true
    }
    return needNewConn
}

// 在pool加入TestOnBorrow方法来去除扫描坏连接
func redo(command string, opt ...interface{}) (interface{}, error) {
    rd := RedisClient.Get()
    defer rd.Close()

    var conn redis.Conn
    var err error
    var maxretry = 3
    var needNewConn bool

    resp, err := rd.Do(command, opt...)
    needNewConn = IsConnError(err)
    if needNewConn == false {
        return resp, err
    } else {
        conn, err = RedisClient.Dial()
    }

    for index := 0; index < maxretry; index++ {
        if conn == nil && index+1 > maxretry {
            return resp, err
        }
        if conn == nil {
            conn, err = RedisClient.Dial()
        }
        if err != nil {
            continue
        }

        resp, err := conn.Do(command, opt...)
        needNewConn = IsConnError(err)
        if needNewConn == false {
            return resp, err
        } else {
            conn, err = RedisClient.Dial()
        }
    }

    conn.Close()
    return "", errors.New("redis error")
}

// xiaorui.cc

只要看到 “use of closed network connection” 、 “connect: connection refused”、io.EOF 都会new一个先连接。常规的思路应该是,当前连接有io的异常,重新new一个新连接,然后把新连接替换老连接,但是redigo没有类似的操作入口,导致新连接游离在pool外面。 我们的主要目的是为了 兼容redis client的异常,所以临时的短连接也是可以接受,只需要等到下个TestOnBorrow的检测周期就可以了。


总结:

       gomodule / redigo的易用性不错,个人感觉体验要比go-redis强一些,但是gomodule/redigo的作者在issue里说,不打算支持redis cluster协议 ? look  https://github.com/gomodule/redigo/issues/319 。

      好在社区里有人基于redigo做了易用性相当高的redis cluster client库 https://github.com/chasex/redis-go-cluster。大家可以看下 redis-go-cluster的源码,里面做了各种multi key下的聚合操作。默认redis cluster client是不支持单次多slot区间key的使用,但redis-go-cluster解决了这该类问题,实现的原理很简单,就是内部把key分到不同的队列里,然后开多个goroutine发送。如果中间出现slot migrate问题,那么重来一遍。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc