前言:
前两天去菲律宾长滩岛度假去了,有段时间没有更新技术文章了。简单说说度假期间发生的事情吧,可能因为心情和身体不理想,所以也没做什么有趣的水上运动,只是在沙滩上看着大海发愣罢了,时常是一呆一上午,再一呆又一个下午。 中间有几个黑不溜秋的妹子跟我搭讪,我也实在没有心气去搭理。
另外收到一个出版社的邀请,让我去写一本关于 构建高性能python服务的书籍。加这个出版社的要求,现在最少有5个出版社找我了。还记得曾经跟出版社闹翻过,过度的改来改去实在累心,还不如写点博客,错别字都没人鸟你。 最后,这几天抽空看了群里初学者们问的几个问题,我个人觉得问点比较典型,这次专门来聊聊。
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新. http://xiaorui.cc/?p=4358
提出的问题有几个点。 为什么用time.sleep会阻塞? 为什么 gen.sleep 不会阻塞 ? 如何自己构建非阻塞的第三方客户端库包? 如何分析第三方库包在tornado下是否异步?
在看完这篇文章后,这上面的问题都会应刃而解的。首先我们要知道 time.sleep() 在异步非阻塞模型中是没有意义的,因为他只走计算,不会走io的,继续看下去你会理解我这么说的原因。 还有就是 time.sleep() 是io阻塞的操作,如果在只有一个线程的python环境跑,那么time.sleep必然会阻塞继而发生上下文切换。 tornado正好是单进程单线程的服务,所以当你使用time.sleep()的时候必然会阻塞。
那么time.sleep() 底层是如何实现的 ? select ! select 虽然常常是作为io多路复用的利器,但他本身是有timeout功能的,也就是说借助于timeout来实现sleep。 以前写过一篇文章,有兴趣的朋友可以看看。 http://xiaorui.cc/?p=3160
tornado默认是没有针对 import time ; time.sleep(seconds)做直接的patch的,但是tornado ioloop内部实现了一个heapq小顶堆,越小的值会优先pop出来。 这里gevent显得方便的多,只需要引用time patch就可以替换内存引用。
下面是一段tornado的服务端例子代码,需要说明的是 gen.sleep 跟 gen.Task( add_timeout….) 实现是一样的。对于测试,我们只需要借助curl和strace就可以了。
#xiaorui.cc class MainHandler(web.RequestHandler): @gen.coroutine def sleep(self, seconds): yield gen.Task( ioloop.IOLoop.current().add_timeout, deadline=datetime.timedelta(seconds=seconds)) @gen.coroutine def get(self): logger.warning('Hello') #time.sleep(5) # 堵塞 #yield gen.sleep(5) # 非阻塞 #yield self.sleep(seconds=5) # 非阻塞 self.write("when i sleep")
我们通过strace来分析下tornado在time.sleep 和 yield gen.sleep不同的系统调用。下面为tornado 进行 time.sleep(5) 的strace结果.
# xiaorui.cc {{EPOLLIN, {u32=3, u64=21993226147725315}}}, 1023, 3600000) = 1 accept(3, {sa_family=AF_INET, sin_port=htons(50068), sin_addr=inet_addr("127.0.0.1")}, [16]) = 8 fcntl(8, F_GETFL) = 0x2 (flags O_RDWR) fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) = 0 recvfrom(8, "GET / HTTP/1.1\r\nUser-Agent: curl"..., 65536, 0, NULL, NULL) = 167 stat("/etc/localtime", {st_mode=S_IFREG|0644, st_size=405, ...}) = 0 write(2, "[W 170306 14:55:04 sl:20] Hello\n", 32) = 32 select(0, NULL, NULL, NULL, {5, 0}) = 0 (Timeout) # 当前线程直接调用select进行sleep操作,要注意他只做 timeout操作而已,不会监听某个fd事件集合 sendto(8, "HTTP/1.1 200 OK\r\nDate: Mon, 06 M"..., 207, 0, NULL, 0) = 207 setsockopt(8, SOL_TCP, TCP_NODELAY, [1], 4) = 0 setsockopt(8, SOL_TCP, TCP_NODELAY, [0], 4) = 0 stat("/etc/localtime", {st_mode=S_IFREG|0644, st_size=405, ...}) = 0 write(2, "[I 170306 14:55:09 web:1971] 200"..., 61) = 61 accept(3, 0x7fff1bd9be10, [16]) = -1 EAGAIN (Resource temporarily unavailable) recvfrom(8, "", 65536, 0, NULL, NULL) = 0 close(8) = 0 epoll_wait(4,
下面是非阻塞 yield gen.sleep(5) 的结果 .
# xiaorui.cc {{EPOLLIN, {u32=3, u64=21993226147725315}}}, 1023, 3600000) = 1 accept(3, {sa_family=AF_INET, sin_port=htons(49917), sin_addr=inet_addr("127.0.0.1")}, [16]) = 8 fcntl(8, F_GETFL) = 0x2 (flags O_RDWR) fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) = 0 recvfrom(8, "GET / HTTP/1.1\r\nUser-Agent: curl"..., 65536, 0, NULL, NULL) = 167 stat("/etc/localtime", {st_mode=S_IFREG|0644, st_size=405, ...}) = 0 write(2, "[W 170306 14:46:42 sl:20] Hello\n", 32) = 32 epoll_ctl(4, EPOLL_CTL_ADD, 8, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=8, u64=19471204171644936}}) = 0 accept(3, 0x7fff63d4d130, [16]) = -1 EAGAIN (Resource temporarily unavailable) epoll_wait(4, {}, 1023, 4999) = 0 # epoll_wait不仅会超时5000ms-1,而且会监听eventpool epoll_wait(4, {}, 1023, 0) = 0 epoll_wait(4, {}, 1023, 0) = 0 epoll_wait(4, {}, 1023, 0) = 0 sendto(8, "HTTP/1.1 200 OK\r\nDate: Mon, 06 M"..., 207, 0, NULL, 0) = 207 #返回客户端数据 setsockopt(8, SOL_TCP, TCP_NODELAY, [1], 4) = 0 # 取消 nagle 算法 setsockopt(8, SOL_TCP, TCP_NODELAY, [0], 4) = 0 stat("/etc/localtime", {st_mode=S_IFREG|0644, st_size=405, ...}) = 0 write(2, "[I 170306 14:46:47 web:1971] 200"..., 61) = 61 # 标准输出 epoll_wait(4, {{EPOLLIN, {u32=8, u64=19471204171644936}}}, 1023, 0) = 1
不管是tornado、还是那种基于libevent、libev、libuv封装的网络服务,对于定时器的实现几乎是一致的 ! 以前就读过libevent的部分代码,他的代码设计还是相当的巧妙,对于服务构建者来说,可以节省了对于timer、signal、event的抽象.
这次主要说libevent的timer,libevent通过一个小根堆结构来保存定时事件,堆顶元素是最近即将超时的时间, 比如我们配置了三个定时器,分别在3S,1S,4S后超时,那么堆顶的元素就是1S的定时器,堆数据结构可以让我们快速的拿到top数据。
不管是select/poll/epoll,监控文件描述符的时候都会设置一个超时间隔,我们恰好可以把堆顶元素的超时时间作为这个超时间隔。
还是上面三个timer,在本次的epoll_wait中我们可以取出堆顶元素1S作为超时间隔,分以下两种情况:
1、如果1秒内没有I/O事件,那么epoll_wait将在1S后超时。此时应该触发1S的定时事件。
2、如果1秒内发生了I/O事件,那么epoll_wait返回时,1S的定时事件不应该触发。
所以,每次epoll_wait返回时,我们就依次取出堆顶元素,如果超时,就处理超时事件。 拿tornado和gevent来说,默认的timeout会很长, 360000 !
# xiaorui.cc epoll_wait(4, {{EPOLLIN, {u32=3, u64=21993226147725315}}}, 1023, 3600000) = 1 accept(3, {sa_family=AF_INET, sin_port=htons(51450), sin_addr=inet_addr("127.0.0.1")}, [16]) = 8
那么如果我正在epoll_wait,但是想更改一个定时器,这个是怎么个流程?
当你触发一个请求的时候,不管是listen fd还是 client fd 都会有一个event被唤醒,当你是新请求当然会触发listenfd –> accept ,如果是已经建立的连接,只需要重置堆的分值就可以了。 主线程每次做epoll_wait之前都会从小堆里取顶堆的值作为timeout,在tornado里对应的是 poll_timeout .
Tornado 的定制器主要分两种,一个是_Timeout 定时执行,一个是PeriodicCallback 周期任务。
_Timeout的代码有些繁多,关键是这么几个函数。
# xiaorui.cc def start(self): 获取 due_timeouts 超时任务,回调_run_callback, 获取下一次ioloop的poll_timeout时间 ! def add_timeout(self, deadline, callback, *args, **kwargs): if isinstance(deadline, numbers.Real): # 数字 return self.call_at(deadline, callback, *args, **kwargs) elif isinstance(deadline, datetime.timedelta): # datetime对象类型 return self.call_at(self.time() + timedelta_to_seconds(deadline), callback, *args, **kwargs) else: raise TypeError("Unsupported deadline %r" % deadline) def call_at(self, deadline, callback, *args, **kwargs): # 加入定时器 timeout = _Timeout( deadline, functools.partial(stack_context.wrap(callback), *args, **kwargs), self) heapq.heappush(self._timeouts, timeout) return timeout def remove_timeout(self, timeout): # 删除定时器,不是真的删除,而是设None timeout.callback = None self._cancellations += 1
PeriodicCallback的实现简单干脆的,他只是实现了对timeout函数的封装,也就是说 我们每次pop一个超时任务的时候,我会计算他的下次调度时间,然后再次push这个任务,这样就能保证下次的周期任务还是存在于二叉堆中的。
# xiaorui.cc def _schedule_next(self): if self._running: current_time = self.io_loop.time() if self._next_timeout <= current_time: callback_time_sec = self.callback_time / 1000.0 self._next_timeout += (math.floor((current_time - self._next_timeout) / callback_time_sec) + 1) * callback_time_sec self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
至于更详细的tornado定时器源码实现,建议大家直接去看 tornado.ioloop的def start()函数描述吧。
http://www.tornadoweb.org/en/stable/_modules/tornado/ioloop.html
更新中….
END.