前言:
又是golang的定时器时间轮库,这是我实现的第二版时间轮了 😅 ,以前的版本不是不能用,不好用而已。主要问题是实现偏复杂,就懒得维护,索性直接重新造轮子了。
以前是如何实现的 ?以前时间轮每个格子里是一个最小堆,由于堆的时间复杂度问题,导致密集场景下cpu还是不低。当然对比golang timer要好的多。 当然也有优点,为了支持并发操作时间轮,把锁的粒度细到每个槽位,减少了锁的竞争。
第一版的代码,有兴趣可以看看,https://github.com/rfyiamcool/go-ringtimer
话说,当初为什么要在时间轮里加入最小堆,而不是双端列表或者map这类的结构。主要是考虑定时任务超过了时间轮的大小,而现在改用map来存,对于超过时间轮周期的任务设定一个round环数,每轮一圈就减 1 。
该文章后续仍在不断的更新修改中, 请移步到原文地址 http://xiaorui.cc/?p=6160
老生常谈
golang在1.9.x版本后优化了定时器,以前为一个四叉堆,现在是多个四叉堆,具体你能用到多少个堆,要看你的gomaxprocs配置大小了。go timer init里初始化了64个timerBucket,那么time会如何选择timerBucket? 是通过g.m.p.id % 64取摸拿到的timerBucket。
// xiaorui.cc func (t *timer) assignBucket() *timersBucket { id := uint8(getg().m.p.ptr().id) % timersLen t.tb = &timers[id].timersBucket return t.tb }
那么golang分了这么多时间堆是干嘛的? 主要是为了解决锁的竞争。既然做了优化,那么时间轮的意义又是什么?
golang标准库里实现的定时器都是高精度的,高精度带来了频发的读写操作heap和锁的竞争压力。
时间轮的意义在于把精度放低,比如同一秒内的定时任务放凑在一起,数据结构改用map, 那么时间轮只需要o(1)的时间复杂度就可以把任务放进去,只需要一次加锁放锁就可以把同一个时间精度的任务都捞出来。
go timewheel
timewheel的代码实现原理我这里就不细说了,源码已经推到github里了,有兴趣的朋友可以看下。https://github.com/rfyiamcool/go-timewheel
下面列下go-timewheel间轮的基本用法:
// xiaorui.cc // https://github.com/rfyiamcool/go-timewheel/blob/master/README.md // 初始化时间轮, 精度为1s,槽位360个 tw, err := NewTimeWheel(1 * time.Second, 360) if err != nil { panic(err) } // 启动时间轮 tw.Start() // 关闭时间轮 tw.Stop() // 添加任务 task := tw.Add(5 * time.Second, func(){}) // 删除任务 tw.Remove(task) // 添加周期性任务 task := tw.AddCron(5 * time.Second, func(){ ... }) // 实现time.Sleep tw.Sleep(5 * time.Second) similar to time.After() // 实现After <- tw.After(5 * time.Second) similar to time.NewTimer // 实现NewTimer定时器 timer :=tw.NewTimer(5 * time.Second) <- timer.C timer.Reset(1 * time.Second) timer.Stop() // 实现NewTicker定时器 timer :=tw.NewTicker(5 * time.Second) <- timer.C timer.Stop() // 实现go time的AfterFunc runner :=tw.AfterFunc(5 * time.Second, func(){}) <- runner.C runner.Stop()
功能点实现思路
如何解决提交的定时任务小于时间轮的精度?
如果小于时间轮精度,那么就强制该任务为一个时间轮精度。
如何保证map结构的读写安全?
增删改查都在一个协程里,添加删除任务也只是添加到chan里。
如何保证调度器和任务添加在一个bucket的冲突问题?
比如调度器正在执行bucket 1,但是添加任务的槽位也是bucket 1,由于写安全,添加是在调度器执行之后,那么可以想象该任务算是被遗漏了,只有在下次轮询才能被执行。当然,还是有办法解决的。
我采用的方法跟上面一样,所有时间轮任务的管理都在一个协程里。
调度器每次执行任务是按照time.Ticker走的,如果调度器执行流发生超时?
一般是不会出现超时。但如果发生超时,会导致时间轮变慢。我们可以独立time.Ticker定时器,go默认给time.Ticker缓冲为1,我们可以多缓冲几个事件。
// xiaorui.cc func (tw *TimeWheel) tickGenerator() { if tw.tickQueue != nil { return } for !tw.exited { select { case <-tw.ticker.C: select { case tw.tickQueue <- time.Now(): default: panic("raise long time blocking") } } } } func (tw *TimeWheel) schduler() { queue := tw.ticker.C if tw.tickQueue == nil { queue = tw.tickQueue } for { select { case <-queue: tw.handleTick() ...
如何兼容了golang time定时器的实现?
通过在定时任务里加入各种功能的回调方法来实现的,这里在时间轮里实现了类似标准库的 time.After, time.Sleep, time.NewTimer, time.AfterFunc, time.Tikcer的方法。
如何解决go标准库里timer, ticker stop后,select无法接收到已经stop事件的问题
我在timer ticker里封装了context cancel,当用户触发stop的时候,可以使用ctx来得知该定时器已经被关闭。
// xiaorui.cc // similar to golang std timer type Timer struct { task *Task tw *TimeWheel fn func() // external custom func C chan bool cancel context.CancelFunc Ctx context.Context } func (t *Timer) Reset(delay time.Duration) { var task *Task if t.fn != nil { // use AfterFunc task = t.tw.addAny(delay, func() { t.fn() notfiyChannel(t.C) }, modeNotCircle, modeIsAsync, // must async mode ) } else { task = t.tw.addAny(delay, func() { notfiyChannel(t.C) }, modeNotCircle, modeNotAsync) } t.task = task } func (t *Timer) Stop() { t.task.stop = true t.cancel() t.tw.Remove(t.task) }
在时间轮里,调度器处理事件时,区分了同步和异步调用
参考了go time的实现,当定时任务类型是通知类型的,比如timer.C,那么就可以用同步调用,因为在select里实现了default且chan buf为1,所谓不会阻塞。 当进行AfterFunc时,就使用go func调用。
简单说,直接函数调用肯定要比go func快的多的多。虽然golang实现了gfree来缓存了goroutine对象,但毕竟要经过runtime的pmg调度。
// xiaorui.cc func AfterFunc(d Duration, f func()) *Timer { t := &Timer{ r: runtimeTimer{ when: when(d), f: goFunc, // go func 异步调用 arg: f, }, } startTimer(&t.r) return t } func goFunc(arg interface{}, seq uintptr) { go arg.(func())() } func NewTimer(d Duration) *Timer { c := make(chan Time, 1) // 缓冲为1,提高了效率 t := &Timer{ C: c, r: runtimeTimer{ when: when(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t } func sendTime(c interface{}, seq uintptr) { select { case c.(chan Time) <- Now(): default: // 不阻塞 } }
性能
没有做太深度的压力测试。添加50w个定时任务都在合理的时间内都执行完毕。
// xiaorui.cc add timer cost: 330.263465ms recv sig cost: 1.196655379s
如果性能还是达不到要求,可以封装多个时间轮到一个池子,类似go time标准库里timerprc的实现。
可能不会存在的问题?
如果添加的定时器超过时间轮的大小,那么我这里会给他加入一个round字段,标明他的轮数。那么问题来了,如果大量的任务超过了时间轮大小,那么一个槽位的map里除了存在正常的任务外,且会存在一些下一轮才能执行的任务。
这样会造成遍历的开销? 是有点,但时间轮的场景一般是用在时间短且任务多的超时场景里。如果任务的周期有些长,可以多实例化几组时间轮,比如精度为秒的和分钟的各一个时间轮。
结尾:
时间轮的效率确实很高,在一个高频的推送服务里会有各种定时器的使用,时间轮可以减少cpu消耗。