前言:
研究golang rate限频的源码时候,发现rate的Wait,WaitN存在一个问题。当waiter提前return后,当前和后面waiter无法进行延迟时间重排,回收的问题。
该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5930
问题:
我们知道go rate waitN是可以传递context的。比如,在一个场景里,5秒放一个token,池的大小就一个。开始协程g1拿到了token,g2、g3也需要token, 但是rate池子里已经没有token, g2、g3自然就需要wait方法去等待新token的产生, 那么g2,、g3需要等待多久? 最好的计划肯定是等待到下次产生token的时候,简单说g2等待5s, g3需要等待到10s。
当我们通过传递context来主动关闭g2的等待,但协程 g3 还是在等待10s。也就是说,g2退出了,按理来说后面的rate waiter应该调整下时间。但go rate没有做这方面的处理。
如何解决?
我自己想了差不多有三种方法吧。
第一种方法,修改go rate源码,可以把wait里的timer放在heap里,某waiter退出后,我们可以把大于该waiter等待时间的timer,重新reset一下。当然这个复杂度有点大了。
第二种方法,所有协程统一按照下次token的生产时间来等待,但这个问题就有点忙轮询和竞争了。或者可以自定义等待时间加配 rate allow 非阻塞方法。
第三种方法,自己去实现限频模块,new一个协程专门来生产token,可以用chan来做通知。
我最后采用的是第三种,就是自己实现令牌桶。
go rate异常源码分析:
通过代码我们拿到,最后的wait方法select两个chan上,一个是正常可拿到token的timeout定时器,另一个是业务传递进来的context上下文。
当上层把context对应的cancelFunc关闭了,wait在return之前,也只是更新lastEvent和tokens。目的在于,先的waiter在等待时,会稍微减少了一个token等待时间。
// xiaorui.cc
// 等待 token
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
if n > lim.burst && lim.limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
...
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// Reserve
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
delay := r.DelayFrom(now)
if delay == 0 {
// 需要等待,直接拿到了token
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// 按照分配好的delay时间去等待,自然是拿到了token
return nil
case <-ctx.Done():
r.Cancel() // 减少下个协程的等待时间
return ctx.Err()
}
}
// 给waiter算出需要等待的时间
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
需要注意,go rate提供的三个方法中,allow是非阻塞的特性,wait及reserve都是阻塞的特性。reserve / reserveN也是存在上面goroutine退出后,等待时间无法重排的问题,毕竟wait里主要调用的是Reserve的方法。
// xiaorui.cc
func reserveBug() {
l := rate.NewLimiter(1, 1)
for index := 0; index < 10; index++ {
wg.Add(1)
go func() {
r := l.ReserveN(time.Now(), 1)
time.Sleep(200 * time.Millisecond)
r.Cancel()
wg.Done()
}()
}
wg.Wait()
r := l.ReserveN(time.Now(), 1)
fmt.Println("reserve need wait: ", r.Delay())
}
总结:
rate作为go的标准库,他的实现确实很巧妙。使用预计token的生产时间来分配一个个waiter等待者的延迟时间。
但这样带来的问题,一方面充斥了许多的定时器,另一方面不能让waiter快速的收敛排队。这类定时器在高并发下无疑是性能的杀手。
