Golang rate无法延迟重排的BUG

前言:

研究golang rate限频的源码时候,发现rate的Wait,WaitN存在一个问题。当waiter提前return后,当前和后面waiter无法进行延迟时间重排,回收的问题。

该文章后续仍在不断的更新修改中, 请移步到原文地址  http://xiaorui.cc/?p=5930

问题:

我们知道go rate waitN是可以传递context的。比如,在一个场景里,5秒放一个token,池的大小就一个。开始协程g1拿到了token,g2、g3也需要token, 但是rate池子里已经没有token, g2、g3自然就需要wait方法去等待新token的产生, 那么g2,、g3需要等待多久? 最好的计划肯定是等待到下次产生token的时候,简单说g2等待5s, g3需要等待到10s。

当我们通过传递context来主动关闭g2的等待,但协程 g3 还是在等待10s。也就是说,g2退出了,按理来说后面的rate waiter应该调整下时间。但go rate没有做这方面的处理。

如何解决?

我自己想了差不多有三种方法吧。

第一种方法,修改go rate源码,可以把wait里的timer放在heap里,某waiter退出后,我们可以把大于该waiter等待时间的timer,重新reset一下。当然这个复杂度有点大了。

第二种方法,所有协程统一按照下次token的生产时间来等待,但这个问题就有点忙轮询和竞争了。或者可以自定义等待时间加配 rate allow 非阻塞方法。

第三种方法,自己去实现限频模块,new一个协程专门来生产token,可以用chan来做通知。

我最后采用的是第三种,就是自己实现令牌桶。

go rate异常源码分析:

通过代码我们拿到,最后的wait方法select两个chan上,一个是正常可拿到token的timeout定时器,另一个是业务传递进来的context上下文。
当上层把context对应的cancelFunc关闭了,wait在return之前,也只是更新lastEvent和tokens。目的在于,先的waiter在等待时,会稍微减少了一个token等待时间。


// xiaorui.cc

// 等待 token 
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
    if n > lim.burst && lim.limit != Inf {
        return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
    }
    ...
    waitLimit := InfDuration
    if deadline, ok := ctx.Deadline(); ok {
        waitLimit = deadline.Sub(now)
    }
    // Reserve
    r := lim.reserveN(now, n, waitLimit)
    if !r.ok {
        return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
    }
    // Wait if necessary
    delay := r.DelayFrom(now)
    if delay == 0 {
        // 需要等待,直接拿到了token
        return nil
    }
    t := time.NewTimer(delay)
    defer t.Stop()
    select {
    case <-t.C:
        // 按照分配好的delay时间去等待,自然是拿到了token
        return nil
    case <-ctx.Done():
        r.Cancel()  // 减少下个协程的等待时间
        return ctx.Err()
    }
}

// 给waiter算出需要等待的时间
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
    last := lim.last
    if now.Before(last) {
        last = now
    }

    // Avoid making delta overflow below when last is very old.
    maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
    elapsed := now.Sub(last)
    if elapsed > maxElapsed {
        elapsed = maxElapsed
    }

    // Calculate the new number of tokens, due to time that passed.
    delta := lim.limit.tokensFromDuration(elapsed)
    tokens := lim.tokens + delta
    if burst := float64(lim.burst); tokens > burst {
        tokens = burst
    }

    return now, last, tokens
}

需要注意,go rate提供的三个方法中,allow是非阻塞的特性,wait及reserve都是阻塞的特性。reserve / reserveN也是存在上面goroutine退出后,等待时间无法重排的问题,毕竟wait里主要调用的是Reserve的方法。


// xiaorui.cc

func reserveBug() {
	l := rate.NewLimiter(1, 1)
	for index := 0; index < 10; index++ {
		wg.Add(1)
		go func() {
			r := l.ReserveN(time.Now(), 1)
			time.Sleep(200 * time.Millisecond)
			r.Cancel()
			wg.Done()
		}()
	}

	wg.Wait()
	r := l.ReserveN(time.Now(), 1)
	fmt.Println("reserve need wait: ", r.Delay())
}

总结:

rate作为go的标准库,他的实现确实很巧妙。使用预计token的生产时间来分配一个个waiter等待者的延迟时间。
但这样带来的问题,一方面充斥了许多的定时器,另一方面不能让waiter快速的收敛排队。这类定时器在高并发下无疑是性能的杀手。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc