前言:
研究golang rate限频的源码时候,发现rate的Wait,WaitN存在一个问题。当waiter提前return后,当前和后面waiter无法进行延迟时间重排,回收的问题。
该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=5930
问题:
我们知道go rate waitN是可以传递context的。比如,在一个场景里,5秒放一个token,池的大小就一个。开始协程g1拿到了token,g2、g3也需要token, 但是rate池子里已经没有token, g2、g3自然就需要wait方法去等待新token的产生, 那么g2,、g3需要等待多久? 最好的计划肯定是等待到下次产生token的时候,简单说g2等待5s, g3需要等待到10s。
当我们通过传递context来主动关闭g2的等待,但协程 g3 还是在等待10s。也就是说,g2退出了,按理来说后面的rate waiter应该调整下时间。但go rate没有做这方面的处理。
如何解决?
我自己想了差不多有三种方法吧。
第一种方法,修改go rate源码,可以把wait里的timer放在heap里,某waiter退出后,我们可以把大于该waiter等待时间的timer,重新reset一下。当然这个复杂度有点大了。
第二种方法,所有协程统一按照下次token的生产时间来等待,但这个问题就有点忙轮询和竞争了。或者可以自定义等待时间加配 rate allow 非阻塞方法。
第三种方法,自己去实现限频模块,new一个协程专门来生产token,可以用chan来做通知。
我最后采用的是第三种,就是自己实现令牌桶。
go rate异常源码分析:
通过代码我们拿到,最后的wait方法select两个chan上,一个是正常可拿到token的timeout定时器,另一个是业务传递进来的context上下文。
当上层把context对应的cancelFunc关闭了,wait在return之前,也只是更新lastEvent和tokens。目的在于,先的waiter在等待时,会稍微减少了一个token等待时间。
// xiaorui.cc // 等待 token func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { if n > lim.burst && lim.limit != Inf { return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) } ... waitLimit := InfDuration if deadline, ok := ctx.Deadline(); ok { waitLimit = deadline.Sub(now) } // Reserve r := lim.reserveN(now, n, waitLimit) if !r.ok { return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) } // Wait if necessary delay := r.DelayFrom(now) if delay == 0 { // 需要等待,直接拿到了token return nil } t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: // 按照分配好的delay时间去等待,自然是拿到了token return nil case <-ctx.Done(): r.Cancel() // 减少下个协程的等待时间 return ctx.Err() } } // 给waiter算出需要等待的时间 func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { last := lim.last if now.Before(last) { last = now } // Avoid making delta overflow below when last is very old. maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) elapsed := now.Sub(last) if elapsed > maxElapsed { elapsed = maxElapsed } // Calculate the new number of tokens, due to time that passed. delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens + delta if burst := float64(lim.burst); tokens > burst { tokens = burst } return now, last, tokens }
需要注意,go rate提供的三个方法中,allow是非阻塞的特性,wait及reserve都是阻塞的特性。reserve / reserveN也是存在上面goroutine退出后,等待时间无法重排的问题,毕竟wait里主要调用的是Reserve的方法。
// xiaorui.cc func reserveBug() { l := rate.NewLimiter(1, 1) for index := 0; index < 10; index++ { wg.Add(1) go func() { r := l.ReserveN(time.Now(), 1) time.Sleep(200 * time.Millisecond) r.Cancel() wg.Done() }() } wg.Wait() r := l.ReserveN(time.Now(), 1) fmt.Println("reserve need wait: ", r.Delay()) }
总结:
rate作为go的标准库,他的实现确实很巧妙。使用预计token的生产时间来分配一个个waiter等待者的延迟时间。
但这样带来的问题,一方面充斥了许多的定时器,另一方面不能让waiter快速的收敛排队。这类定时器在高并发下无疑是性能的杀手。