前言:
话说Tornado 是由Facebook开源的一个服务器“套装”,适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。
源码组织: |---__init__.py ---auth.py ---...... ---epoll.c ---ioloop.py ---iostream.py ---...
tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。
ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理
iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.
扯了半天,没有说正题…
前段时间一直在看tornado的代码,看起来有些晕,但是收获还是不小. 废话不多说。 最近有个应用场景是需要tornado subprocess组合的,具体的业务就不描述了,我所要的事情就是使用subprocess调用一个java的脚本.. 又是java…
有朋友说我抄袭subprocess代码, 我只是忘了贴源地址? 好家伙居然成抄袭了….
https://github.com/vukasin/tornado-subprocess/blob/master/tornado_subprocess/__init__.py
import subprocess import tornado.ioloop import time import fcntl import functools import os class GenericSubprocess (object): def __init__ ( self, timeout=-1, **popen_args ): self.args = dict() self.args["stdout"] = subprocess.PIPE self.args["stderr"] = subprocess.PIPE self.args["close_fds"] = True self.args.update(popen_args) self.ioloop = None self.expiration = None self.pipe = None self.timeout = timeout self.streams = [] self.has_timed_out = False def start(self): """Spawn the task. Throws RuntimeError if the task was already started.""" if not self.pipe is None: raise RuntimeError("Cannot start task twice") self.ioloop = tornado.ioloop.IOLoop.instance() if self.timeout > 0: self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout ) self.pipe = subprocess.Popen(**self.args) self.streams = [ (self.pipe.stdout.fileno(), []), (self.pipe.stderr.fileno(), []) ] for fd, d in self.streams: flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY fcntl.fcntl( fd, fcntl.F_SETFL, flags) self.ioloop.add_handler( fd, self.stat, self.ioloop.READ|self.ioloop.ERROR) def on_timeout(self): self.has_timed_out = True self.cancel() def cancel (self ) : """Cancel task execution Sends SIGKILL to the child process.""" try: self.pipe.kill() except: pass def stat( self, *args ): '''Check process completion and consume pending I/O data''' self.pipe.poll() if not self.pipe.returncode is None: '''cleanup handlers and timeouts''' if not self.expiration is None: self.ioloop.remove_timeout(self.expiration) for fd, dest in self.streams: self.ioloop.remove_handler(fd) '''schedulle callback (first try to read all pending data)''' self.ioloop.add_callback(self.on_finish) for fd, dest in self.streams: while True: try: data = os.read(fd, 4096) if len(data) == 0: break dest.extend([data]) except: break @property def stdout(self): return self.get_output(0) @property def stderr(self): return self.get_output(1) @property def status(self): return self.pipe.returncode def get_output(self, index ): return "".join(self.streams[index][1]) def on_finish(self): raise NotImplemented() class Subprocess (GenericSubprocess): def __init__ ( self, callback, *args, **kwargs): self.callback = callback self.done_callback = False GenericSubprocess.__init__(self, *args, **kwargs) def on_finish(self): if not self.done_callback: self.done_callback = True '''prevent calling callback twice''' self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out)) if __name__ == "__main__": ioloop = tornado.ioloop.IOLoop.instance() def print_timeout( status, stdout, stderr, has_timed_out) : assert(status!=0) assert(has_timed_out) print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def print_ok( status, stdout, stderr, has_timed_out) : assert(status==0) assert(not has_timed_out) print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def print_error( status, stdout, stderr, has_timed_out): assert(status!=0) assert(not has_timed_out) print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out) def stop_test(): ioloop.stop() t1 = Subprocess( print_timeout, timeout=3, args=[ "sleep","5"] ) t2 = Subprocess( print_ok, timeout=3, args=[ "ip", "a" ] ) t3 = Subprocess( print_ok, timeout=3, args=[ "sleepdsdasdas", "1" ] ) t4 = Subprocess( print_error, timeout=3, args=[ "cat", "/etc/sdfsdfsdfsdfsdfsdfsdf" ] ) t1.start() t2.start() try: t3.start() assert(false) except: print "OK" t4.start() ioloop.add_timeout(time.time() + 10, stop_test) ioloop.start()
Tornado subprocess组合,在web api系统调用本身是尴尬的事情,尽量需要避免这样的应用。
https://github.com/vukasin/tornado-subprocess/blob/master/tornado_subprocess/__init__.py
代码抄袭的爽歪歪,拿回来就自己用。。。