golang通用自定义连接池的实现

前言:

      有段时间没写博客了,顺手分享下前些时间写的一个通用连接池。应该有朋友要说 又是个轮子了。这还真不是要造轮子,像我们平时用的golang的mysql和redis库,基本都有连接池的实现。 但 go-nsq是没有连接池实现的。

      先前我认为nsq作为golang曾经很火的一个mq,应该很是完善,但经过这段时间的深耕,发现小问题不断。 go-nsq这个库作者基本不管不问了,看样子是放弃了。 既然是这种状态为毛还用nsq,我能说是历史原因么? 我后期会专门写一个文章来介绍我们使用nsq遇到的问题。  该文章后续仍在不断的更新修改中, 请移步到原文地址  http://xiaorui.cc/?p=5434

      说下怎么发现的这个问题。每当服务有大量的埋点数据需要推送到nsq时,会发现nsq推送不及时,publish的速度不给力。 服务进程的cpu不大,内存不大,网络是内网,也很小。io更很小。并发的协程也是足够的多,按道理应该很快就推送。  导出go的协程调用栈,发现不少协程都在等待nsq的chan发送。 分析go-nsq源码得知,publish的逻辑都是串行的,每个conn会new一个协程专门串行发送数据。 当我们调用publish和publishAsync其实都是往chan绑定的chan发送导入数据,然后又一个协程去串行发送。

      也正是因为go-nsq做了这样的协程安全,让我怀疑整个进程就只有一个连接。用 lsof 发现服务跟nsq连接就只有一个….  go nsq的NewProducer的config里也没有任何连接池的相关配置参数。 

feature_g 20593 work  33u     IPv4 26183593      0t0      TCP xxxx:38966-> xxxxx:4150 (ESTABLISHED)

怎么解决?

第一种:

最简单的方法让每个常驻的协程绑定一个独占的连接,这样配置很简单,但是如果你的协程加大到500,那么连接也跟着涨到500, 不合理。   这里吐槽下 nsq的性能实在不咋地 !  在大量的客户端建立连接时,出现连接失败的问题,需要不断的重试。

第二种:

标准答案使用连接池。本来想把go redis的pool给移植过来,但发现他的实现着实有些麻烦,各种状态位的判断。 golang database/sql连接池的实现也麻烦,关键是不好移植,代码有些耦合。 好吧,退一步选了 大神 fatih 的pool库,算是一个好用的半成品。我在这基础上加了点功能,比如 fatih的pool自身没有实现的 主动检查连接超时、MaxIdle配置,还有MaxConn在并发时的锁控制。

连接池的代码扔到github了,有兴趣的朋友可以瞅瞅,使用方法简单,抽象了interface{}, 只要实现相应的方法就完事了。

最近项目催促的太紧,导致没时间好好打磨下go连接池的代码,自觉地写得不优美,尤其是锁的处理,但是可以用的,我已经把nsq和kafka都嵌入了这套连接池里了。

多说一句,请注意消息队列的连接池,尽量只放入生产端连接,因为消费端一般是用来回调,他自身也是有状态,不好控制释放连接的时机。

话说,很多的消息队列的go client都没有连接池的实现,不说nsq和kafka,还有rabbtimq。


https://github.com/silenceper/pool


// xiaorui.cc

var nsqPool pool.Pool

func initPool() {
    //factory 创建连接的方法
    factory := func() (interface{}, error) {
        conn := tool.NewNsqEntry()
        conn.InitProducer()
        return conn, nil
    }

    //close 关闭连接的方法
    close := func(v interface{}) error {
        return v.(*tool.NsqEntry).Close()
    }

    //创建一个连接池: 初始化5,最大连接30
    poolConfig := &pool.PoolConfig{
        InitialCap: 5,
        MaxCap:     30,
        Factory:    factory,
        Close:      close,
        //连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题
        IdleTimeout: 15 * time.Second,
    }
    p, err := pool.NewChannelPool(poolConfig)
    if err != nil {
        fmt.Println("err=", err)
    }

    //从连接池中取得一个连接
    v, err := p.Get()

    //do something
    //conn=v.(net.Conn)

    //将连接放回连接池中
    p.Put(v)

    //释放连接池中的所有连接
    //p.Release()

    //查看当前连接中的数量
    current := p.Len()
    fmt.Println("len=", current)
}

总结:

       nsq和kafka使用这套连接池已经跑了一个多月了,我一直关注连接的个数及异常,到现在为止还没出问题。连接池的实现多种多样,貌似多是使用链表加锁实现的,我这个是使用channel来实现的,目的都一样,为了线程安全。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc