源码分析tornado gen.sleep非阻塞

前言:

      前两天去菲律宾长滩岛度假去了,有段时间没有更新技术文章了。简单说说度假期间发生的事情吧,可能因为心情和身体不理想,所以也没做什么有趣的水上运动,只是在沙滩上看着大海发愣罢了,时常是一呆一上午,再一呆又一个下午。 中间有几个黑不溜秋的妹子跟我搭讪,我也实在没有心气去搭理。  

     另外收到一个出版社的邀请,让我去写一本关于 构建高性能python服务的书籍。加这个出版社的要求,现在最少有5个出版社找我了。还记得曾经跟出版社闹翻过,过度的改来改去实在累心,还不如写点博客,错别字都没人鸟你。  最后,这几天抽空看了群里初学者们问的几个问题,我个人觉得问点比较典型,这次专门来聊聊。


该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.   http://xiaorui.cc/?p=4358

提出的问题有几个点。 为什么用time.sleep会阻塞? 为什么 gen.sleep 不会阻塞 ? 如何自己构建非阻塞的第三方客户端库包? 如何分析第三方库包在tornado下是否异步? 

在看完这篇文章后,这上面的问题都会应刃而解的。首先我们要知道 time.sleep() 在异步非阻塞模型中是没有意义的,因为他只走计算,不会走io的,继续看下去你会理解我这么说的原因。   还有就是 time.sleep() 是io阻塞的操作,如果在只有一个线程的python环境跑,那么time.sleep必然会阻塞继而发生上下文切换。  tornado正好是单进程单线程的服务,所以当你使用time.sleep()的时候必然会阻塞。

那么time.sleep() 底层是如何实现的 ?  select !  select 虽然常常是作为io多路复用的利器,但他本身是有timeout功能的,也就是说借助于timeout来实现sleep。 以前写过一篇文章,有兴趣的朋友可以看看。 http://xiaorui.cc/?p=3160

tornado默认是没有针对 import time ; time.sleep(seconds)做直接的patch的,但是tornado ioloop内部实现了一个heapq小顶堆,越小的值会优先pop出来。  这里gevent显得方便的多,只需要引用time patch就可以替换内存引用。

下面是一段tornado的服务端例子代码,需要说明的是 gen.sleep 跟 gen.Task( add_timeout….) 实现是一样的。对于测试,我们只需要借助curl和strace就可以了。 

#xiaorui.cc

class MainHandler(web.RequestHandler):

    @gen.coroutine
    def sleep(self, seconds):
        yield gen.Task(
            ioloop.IOLoop.current().add_timeout,
            deadline=datetime.timedelta(seconds=seconds))

    @gen.coroutine
    def get(self):
        logger.warning('Hello')
        #time.sleep(5)  # 堵塞
        #yield gen.sleep(5)  # 非阻塞 
        #yield self.sleep(seconds=5)  # 非阻塞
        self.write("when i sleep")

我们通过strace来分析下tornado在time.sleep 和 yield gen.sleep不同的系统调用。下面为tornado 进行 time.sleep(5) 的strace结果.

# xiaorui.cc
{{EPOLLIN, {u32=3, u64=21993226147725315}}}, 1023, 3600000) = 1
accept(3, {sa_family=AF_INET, sin_port=htons(50068), sin_addr=inet_addr("127.0.0.1")}, [16]) = 8
fcntl(8, F_GETFL)                       = 0x2 (flags O_RDWR)
fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
recvfrom(8, "GET / HTTP/1.1\r\nUser-Agent: curl"..., 65536, 0, NULL, NULL) = 167
stat("/etc/localtime", {st_mode=S_IFREG|0644, st_size=405, ...}) = 0
write(2, "[W 170306 14:55:04 sl:20] Hello\n", 32) = 32
select(0, NULL, NULL, NULL, {5, 0})     = 0 (Timeout)   # 当前线程直接调用select进行sleep操作,要注意他只做 timeout操作而已,不会监听某个fd事件集合
sendto(8, "HTTP/1.1 200 OK\r\nDate: Mon, 06 M"..., 207, 0, NULL, 0) = 207
setsockopt(8, SOL_TCP, TCP_NODELAY, [1], 4) = 0
setsockopt(8, SOL_TCP, TCP_NODELAY, [0], 4) = 0
stat("/etc/localtime", {st_mode=S_IFREG|0644, st_size=405, ...}) = 0
write(2, "[I 170306 14:55:09 web:1971] 200"..., 61) = 61
accept(3, 0x7fff1bd9be10, [16])         = -1 EAGAIN (Resource temporarily unavailable)
recvfrom(8, "", 65536, 0, NULL, NULL)   = 0
close(8)                                = 0
epoll_wait(4,

下面是非阻塞 yield gen.sleep(5) 的结果 .

# xiaorui.cc

{{EPOLLIN, {u32=3, u64=21993226147725315}}}, 1023, 3600000) = 1
accept(3, {sa_family=AF_INET, sin_port=htons(49917), sin_addr=inet_addr("127.0.0.1")}, [16]) = 8
fcntl(8, F_GETFL)                       = 0x2 (flags O_RDWR)
fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
recvfrom(8, "GET / HTTP/1.1\r\nUser-Agent: curl"..., 65536, 0, NULL, NULL) = 167
stat("/etc/localtime", {st_mode=S_IFREG|0644, st_size=405, ...}) = 0
write(2, "[W 170306 14:46:42 sl:20] Hello\n", 32) = 32
epoll_ctl(4, EPOLL_CTL_ADD, 8, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=8, u64=19471204171644936}}) = 0
accept(3, 0x7fff63d4d130, [16])         = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(4, {}, 1023, 4999)           = 0   # epoll_wait不仅会超时5000ms-1,而且会监听eventpool
epoll_wait(4, {}, 1023, 0)              = 0
epoll_wait(4, {}, 1023, 0)              = 0
epoll_wait(4, {}, 1023, 0)              = 0
sendto(8, "HTTP/1.1 200 OK\r\nDate: Mon, 06 M"..., 207, 0, NULL, 0) = 207  #返回客户端数据
setsockopt(8, SOL_TCP, TCP_NODELAY, [1], 4) = 0  # 取消 nagle 算法
setsockopt(8, SOL_TCP, TCP_NODELAY, [0], 4) = 0
stat("/etc/localtime", {st_mode=S_IFREG|0644, st_size=405, ...}) = 0
write(2, "[I 170306 14:46:47 web:1971] 200"..., 61) = 61   # 标准输出
epoll_wait(4, {{EPOLLIN, {u32=8, u64=19471204171644936}}}, 1023, 0) = 1

不管是tornado、还是那种基于libevent、libev、libuv封装的网络服务,对于定时器的实现几乎是一致的 !   以前就读过libevent的部分代码,他的代码设计还是相当的巧妙,对于服务构建者来说,可以节省了对于timer、signal、event的抽象. 

这次主要说libevent的timer,libevent通过一个小根堆结构来保存定时事件,堆顶元素是最近即将超时的时间, 比如我们配置了三个定时器,分别在3S,1S,4S后超时,那么堆顶的元素就是1S的定时器,堆数据结构可以让我们快速的拿到top数据。

不管是select/poll/epoll,监控文件描述符的时候都会设置一个超时间隔,我们恰好可以把堆顶元素的超时时间作为这个超时间隔。

还是上面三个timer,在本次的epoll_wait中我们可以取出堆顶元素1S作为超时间隔,分以下两种情况:

1、如果1秒内没有I/O事件,那么epoll_wait将在1S后超时。此时应该触发1S的定时事件。
2、如果1秒内发生了I/O事件,那么epoll_wait返回时,1S的定时事件不应该触发。

所以,每次epoll_wait返回时,我们就依次取出堆顶元素,如果超时,就处理超时事件。 拿tornado和gevent来说,默认的timeout会很长, 360000 !

# xiaorui.cc

epoll_wait(4, {{EPOLLIN, {u32=3, u64=21993226147725315}}}, 1023, 3600000) = 1
accept(3, {sa_family=AF_INET, sin_port=htons(51450), sin_addr=inet_addr("127.0.0.1")}, [16]) = 8

那么如果我正在epoll_wait,但是想更改一个定时器,这个是怎么个流程? 

当你触发一个请求的时候,不管是listen fd还是 client fd 都会有一个event被唤醒,当你是新请求当然会触发listenfd –> accept ,如果是已经建立的连接,只需要重置堆的分值就可以了。 主线程每次做epoll_wait之前都会从小堆里取顶堆的值作为timeout,在tornado里对应的是 poll_timeout .

Tornado 的定制器主要分两种,一个是_Timeout 定时执行,一个是PeriodicCallback 周期任务。 

_Timeout的代码有些繁多,关键是这么几个函数。 

# xiaorui.cc
def start(self):
    获取 due_timeouts 超时任务,回调_run_callback, 获取下一次ioloop的poll_timeout时间 !

def add_timeout(self, deadline, callback, *args, **kwargs):
    if isinstance(deadline, numbers.Real):  # 数字
      return self.call_at(deadline, callback, *args, **kwargs)
    elif isinstance(deadline, datetime.timedelta):  # datetime对象类型
      return self.call_at(self.time() + timedelta_to_seconds(deadline),
                          callback, *args, **kwargs)
    else:
      raise TypeError("Unsupported deadline %r" % deadline)

def call_at(self, deadline, callback, *args, **kwargs):  # 加入定时器
    timeout = _Timeout(
        deadline,
        functools.partial(stack_context.wrap(callback), *args, **kwargs),
        self)
    heapq.heappush(self._timeouts, timeout)
    return timeout

def remove_timeout(self, timeout):  # 删除定时器,不是真的删除,而是设None
    timeout.callback = None
    self._cancellations += 1

PeriodicCallback的实现简单干脆的,他只是实现了对timeout函数的封装,也就是说 我们每次pop一个超时任务的时候,我会计算他的下次调度时间,然后再次push这个任务,这样就能保证下次的周期任务还是存在于二叉堆中的。

# xiaorui.cc

def _schedule_next(self):
    if self._running:
        current_time = self.io_loop.time()

        if self._next_timeout <= current_time:
            callback_time_sec = self.callback_time / 1000.0
            self._next_timeout += (math.floor((current_time - self._next_timeout) /
                                              callback_time_sec) + 1) * callback_time_sec

        self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)

至于更详细的tornado定时器源码实现,建议大家直接去看 tornado.ioloop的def start()函数描述吧。 

http://www.tornadoweb.org/en/stable/_modules/tornado/ioloop.html

更新中….

END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

电子邮件地址不会被公开。 必填项已用*标注