前言:
slack报警提示同事的线上系统爆出了大量的tcp time_wait。通过netstat可以判断出是postgreSQl引起的。初步分析像是连接池没有得到复用,一直在new pg连接和close连接。
我们的pg驱动是golang内置的database/sql库,该库不仅实现了sql基于使用,而且还自带连接池。database/sql库兼容mysql和postgre。
该文章后续仍在不断的更新修改中, 请移步到原文地 http://xiaorui.cc/?p=5771
排除问题
首先我们要知道谁去主动关闭连接,那么谁就会产生2ms时间的time wait。出问题的服务只有grpc server和postgre数据库两个类型的连接。
既然通过netstat可以看出问题出在postgre, 那么问题要么是我们database/sql,要么出在我们的用法上。
database/sql作为golang的标准库,不应该会有这类低级问题。
那么看我们的配置 maxIdle, maxConn, maxConn可以理解为最大的连接数,maxIdle是最大的空闲数。 我曾经看过一些框架的连接池实现,maxIdle的实现略有不同。有些框架会严格控制maxIdle,有些会放到free里,等待expire后淘汰。
那么database/sql是如何实现的? 研究下 database/sql的go源代码吧。
源代码分析
当我们使用query和begin事务操作sql的时候,会调用db.query()方法,该方法会return一个连接。
// xiaorui.cc func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) { var rows *Rows var err error // maxBadConnRetries 默认是2 for i := 0; i < maxBadConnRetries; i++ { // 从连接池中尝试获取连接 rows, err = db.query(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.query(ctx, query, args, alwaysNewConn) } return rows, err } func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) { var tx *Tx var err error // maxBadConnRetries 默认是2 for i := 0; i < maxBadConnRetries; i++ { // 从连接池中尝试获取连接 tx, err = db.begin(ctx, opts, cachedOrNewConn) if err != driver.ErrBadConn { break } } if err == driver.ErrBadConn { return db.begin(ctx, opts, alwaysNewConn) } return tx, err } func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) { dc, err := db.conn(ctx, strategy) if err != nil { return nil, err } return db.beginDC(ctx, dc, dc.releaseConn, opts) } func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) { dc, err := db.conn(ctx, strategy) if err != nil { return nil, err } return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args) }
调用db.conn()时,申请连接会传递一个参数, 该参数用来说明始终实例化新连接还是从缓存中获取。 sql默认就是一个连接池,开始会先用cachedOrNewConn来获取连接。
如果总是返回ErrBadConn,那么就再通过alwaysNewConn来申请。
const ( // xiaorui.cc alwaysNewConn connReuseStrategy = iota cachedOrNewConn )
申请的连接的过程, 简单的说,配置了maxOpen,而且当前已经存在的连接超过maxOpen,那么就一直阻塞等待,等待别的协程来释放sql连接。不匹配前面的条件,那么就实例化新的连接。
// xiaorui.cc func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { if db.closed { db.mu.Unlock() return nil, errDBClosed } lifetime := db.maxLifetime // 获取空闲的连接,判断是否超时,无超时返回调用方,超时返回err, 调用方会重试,默认是重试2次 numFree := len(db.freeConn) if strategy == cachedOrNewConn && numFree > 0 { conn := db.freeConn[0] if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } if err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil } // 等待别人释放归连接。 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() // Timeout the connection request with the context. select { case <-ctx.Done(): ... case ret, ok := <-req: atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) if !ok { return nil, errDBClosed } if ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } } // 不满足上面的条件,那么就实例化新的连接, numOpen计数。 db.numOpen++ // optimistically ci, err := db.connector.Connect(ctx) if err != nil { db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() return nil, err } dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) return dc, nil }
上面是申请连接,下面是释放和归还连接到连接池,当我们执行Query()和手动commit和rollback的时候,都会触发db.releaseConn()方法。
func (dc *driverConn) releaseConn(err error) { dc.db.putConn(dc, err, true) }
当归还连接到连接池的时候,发现当前的连接数已经大于我们自己配置的maxOpen,那么就直接close连接。如果正常连接池,那么观察下db.connRequests是否有人等待连接。
如果在等待,那么就把连接通过channel传递过去。如果没有人正在等待获取sql连接,那么就把连接放到db.freeConn里。
// xiaorui.cc func (db *DB) putConn(dc *driverConn, err error, resetSession bool) { // 尝试把连接归还给连接池 added := db.putConnDBLocked(dc, nil) if !added { // 关闭连接 dc.Close() return } ... if !resetSession { return } ... } func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { if db.closed { return false } if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 for reqKey, req = range db.connRequests { break } delete(db.connRequests, reqKey) // Remove from pending requests. if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed { if db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } db.maxIdleClosed++ } return false }
总结:
在分析go databse/sql的源代码的过程中,我们可以看到了tcp time wait的原因。我们的maxConn有100个, 而maxIdle确只有20个。
那么当用户请求上来的时候,每个协程都会从连接池获取连接,拿不到就实例化一个新的pg连接,用完了连接后尝试归还连接池不成功,那么就主动close连接。 谁先close,谁就有time wait。所以,原因找到了。
解决方法很简单,就是把maxIdle调大一点,其实我还是更喜欢go redis的连接池实现,不会立马把大于maxConn和小于maxIdle的连接给关掉,而是给个释放时间,相当于给一个缓冲池。不然每次都要实例化连接,然后close,调用耗时被延长。