golang调用原生epoll引起event loop阻塞问题

前言:

golang标准库net很优秀,可以让开发者轻易构建非阻塞网络服务,但开发爽快带来的问题协程数加大,比如在net/http里一个连接两个协程,grpc算是业务和keepalive心跳是四个协程,数据的进出是通过channel传输。

golang netpoll抽象了epoll事件的调用,借助runtime的gopark&goready实现就绪协程的调度,让应用层用同步方法构建io异步的网络应用。

该文章原文地址 http://xiaorui.cc/archives/6758

golang gnet

问题:

那么如何规避netpoll的协程太多的问题? 业界通用的方案是通过原syscall epoll实现网络应用,比如evio、gnet库。我先前使用过evio构建过不少服务,但当你的业务调用含有阻塞逻辑时会使event loop陷入同步阻塞。当然在evio里可以开多个event loop来加大并发,且减少同步的可能,但问题要开多少个event loop?

假设同一个event loop中有两个fd被唤醒,第一个fd完成了协议解析开始执行业务逻辑,业务逻辑是访问另一个三方的http api,但对端的api处理很慢,这就意味着你需要等待api的返回,而不能处理第二个fd事件。

那么为什么不把阻塞逻辑用go func异步出去?同一个fd先来后来好几个事件,都异步?那么又要考虑socket并发安全问题。

如不能解决阻塞问题,那么自定义epoll会特别的低效。业务逻辑的阻塞行为压根是没法避免的,除非像redis服务那样,它的业务逻辑就是访问内存的数据结构,无阻塞逻辑。

evio的另外几个问题

evio封装的epoll各类事件看起来很完美,但依旧发现了他几个问题。

第一,多个event loop引起epoll listen fd惊群问题。

虽然2.6早已解决accept惊群,但对于错误性的同时绑定listen fd依然带来惊群。nginx对于epoll listen fd是通过accept mutex实现的,简单说同一时间只有一个进程在监听listen fd,谁拿锁谁去监听。高内核4.5后的EPOLLEXCLUSIVE和内核3.6 reuseport可在内核层面实现负载均衡。

除了内核方法外,还可通过reactor的架构模型也是可以解决listen fd惊群问题。

第二, 用来唤醒epoll的eventfd写入数据没有读出。

第三, loopWrite在内核缓冲区已满无法一次写入时出现写入数据丢失。

为了规避event loop的阻塞问题,我曾经在2019年时跟evio的作者沟通过方案,可以为他提交一个协程池的方案来规避阻塞问题,他的反应很冷淡…

😅 最后还是没给他提交pr。跟同事一起在公司内部基于evio封装了一个叫goepoll网络库,重要的是加入了协程池的支持。可惜的是公司没考虑开源。好在2019年有个gnet库横空出世,不仅解决了evio的种种问题,还设计了ractor网络模型,且加入了协程池的支持。

gnet的协程池设计?

项目地址:https://github.com/panjf2000/gnet ,通过benchmark得出gnet要比我们自己封装的goepoll要更稳定,接口更友好 😅。

下面是gnet官方给出协程池的用法样例。react业务逻辑中把阻塞逻辑扔到ants构建的协程池里,最后通过asyncWrite()来唤醒event loop写入返回。

// xiaorui.cc

func (es *echoServer) React(c gnet.Conn) (out []byte, action gnet.Action) {
        data := append([]byte{}, c.Read()...)
        c.ResetBuffer()

        // Use ants pool to unblock the event-loop.
        _ = es.pool.Submit(func() {
                time.Sleep(1 * time.Second)
                c.AsyncWrite(data)
        })

        return
}

对于阻塞逻辑的处理可以直接go func,也可以使用协程池,主要的关键点在于调用AsyncWrite方法。

// xiaorui.cc

func (c *conn) AsyncWrite(buf []byte) {
    if encodedBuf, err := c.codec.Encode(c, buf); err == nil {
        _ = c.loop.poller.Trigger(func() error {  // 尝试唤醒eventfd
            if c.opened {
                c.write(encodedBuf)  // 写到ringbuffer里并加入事件
                pool.PutBytes(buf)
            }
            return nil
        })
    }
}

AsyncWrite会唤醒fd所在的eventLoop.Poller,唤醒的方法是激活eventloop里的event fd。不仅是唤醒,而且会把异步执行的func放在一个队列里,这个函数队列是slice实现的。AsyncWrite中的write把resp raw写入到ringbuffer数据结构里,然后进行socket写入,当出现eagain时加入读写事件,直到ringbuffer为空。

// xiaorui.cc

func (c *conn) write(buf []byte) {
	if !c.outboundBuffer.IsEmpty() {
		_, _ = c.outboundBuffer.Write(buf)
		return
	}
	n, err := unix.Write(c.fd, buf)
	if err != nil {
		if err == unix.EAGAIN {
			_, _ = c.outboundBuffer.Write(buf)
			_ = c.loop.poller.ModReadWrite(c.fd)
			return
		}
		_ = c.loop.loopCloseConn(c, err)
		return
	}
	if n < len(buf) {
		_, _ = c.outboundBuffer.Write(buf[n:])
		_ = c.loop.poller.ModReadWrite(c.fd)
	}
}

当event loop的poller在epoll_wait拿到wakeFD时,遍历执行函数队列里的函数。为什么不在异步协程里直接write fd,主要是为了保证fd写入安全,所有的write fd都统一由于epoll poller来操作。

// xiaorui.cc

type Poller struct {
	fd            int    // epoll fd
	wfd           int    // wake fd
	wfdBuf        []byte // wfd buffer to read packet
	asyncJobQueue internal.AsyncJobQueue
}

// 存放异步协程最后的resp写入func
type AsyncJobQueue struct {
	lock sync.Locker
	jobs []func() error
}

// 把func放到队列中,并且通过写wfd事件来唤醒event loop的poller
func (p *Poller) Trigger(job internal.Job) error {
	if p.asyncJobQueue.Push(job) == 1 {
		_, err := unix.Write(p.wfd, b)
		return err
	}
	return nil
}

// 阻塞调用epoll wait,直到有新的事件
func (p *Poller) Polling(callback func(fd int, ev uint32) error) (err error) {
	el := newEventList(InitEvents)
	var wakenUp bool
	for {
		n, err0 := unix.EpollWait(p.fd, el.events, -1)
                ...
                // 遍历执行所有的事件
		for i := 0; i < n; i++ {
			if fd := int(el.events[i].Fd); fd != p.wfd {
                ...
			} else {
                // 唤醒的fd是wfd
				wakenUp = true
				_, _ = unix.Read(p.wfd, p.wfdBuf)
			}
		}
		if wakenUp {
                        // 执行asyncWrite的的收尾动作
			if err = p.asyncJobQueue.ForEach(); err != nil {
				return
			}
		}
        ...
	}
}

func (q *AsyncJobQueue) ForEach() (err error) {
	for i := range jobs {
        // 遍历执行该poller里所有异步协程收尾工作
		if err = jobs[i](); err != nil {
			return err
		}
	}
}

为什么要使用协程池?

我们在二次开发evio时加入了协程池,gnet同样也使用了协程池。原因在于一方面规避了more stack的发生,另一方面避免系统抖动时协程大量的产生,退出的协程依旧在allg结构中造成gc的扫描问题。

协程调度延迟加大问题? (很重要!!!)

event loop通过epoll_wait发现无事件时会syscall阻塞。如果event loop数目超过gomaxprocs,那么其他业务协程很大程度是拿不到调度,依赖sysmon来抢占并handoffp解绑pmg才能拿到调度。

当然epoll_wait可以非阻塞的调用,就是timeout = 0,这样其他协程虽然拿到调度,但对于poller来说无脑调用epoll_wait会增加系统消耗。所以event loop的数目要好好斟酌。我们的经验值是3/5。😁

总结:

使用自定义epoll封装网络服务,性能相当的友好。我们已经在多个项目中实践了自定义epoll方案,qps普遍能提高最少20%左右。蚂蚁金服出品的高性能sidecar mosn同样实现了自定义epoll。

前公司的领导是广发证券的架构师,他设计的推送服务就是通过原生epoll来构建的,但没有使用gnet/evio那种syscall epoll方法,而是使用c封装epoll系统调用常驻监听,使用mmap开辟一个无锁队列,这样c和go共享这个队列地址来通信。

go epoll

下个章节专门源码分析gnet reactor实现原理。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc