腾讯的同事问我关于tornado线程池的事,正好趁着这个机会写一篇关于tornado的threadpool线程池异步工作模式实现.
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。
首先我们要知道Tornado是个单线程的服务器,另外他作为服务端是没法使用多线程处理并发,其实也没必要使用多线程,tornado 本身就是利用ioloop的异步回调解决io阻塞的问题. 需要注意的是,tornado并不是所有网络模块都是非阻塞的,对于非阻塞的模块,我们需要我们自己在iostream控制 socket层进行事件注册回调.
如果你的模块就是阻塞的,还就是不想给这模块加patch扩展, 那么你可以用下线程池.
Tornado的concurrent.futures里有个ThreadPoolExecutor,这就是线程池模块。这模块来自python3.x的线程池模块.
#blog: http://xiaorui.cc from concurrent.futures import ThreadPoolExecutor from tornado.ioloop import IOLoop from tornado.concurrent import run_on_executor class SleepHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(10) @tornado.web.asynchronous @tornado.gen.coroutine def get(self): start = time.time() res = yield self.sleep() self.write("when i sleep %f s" % (time.time() - start)) self.finish() @run_on_executor def sleep(self): time.sleep(5) return 5
这次先不细聊tornado的ThreadPoolExecutor线程池, 他的构建方法及调用方式跟我下面说的有些不一样的. 那这次说下如何从头来开发tornado的线程池。
说实话真的很巧,对于线程池的具体实现其实我都想好了,但逗比的事我在github上找到了一个老外提交的代码,功能是让io操作都异步化,但这里的异步跟ThreadPoolExecutor不一样. 但这正是我想要的.
用通俗的话来说他们俩有啥区别:
ThreadPoolExecutor , 可以实现多个人同时等待并获取结果.
thread_pool , 可以实现多个人推送任务. 他会把任务放到一个任务队列里,然后由线程池去消费. 至于结果,没存. 当然如果你想要结果的话,总的来说很像celery ,rq.
这是我加点料后的代码地址, https://github.com/rfyiamcool/tornado-threadpool
代码说明:
老外的代码在Blocking处理阶段是有问题,首先不需要判断thread_locals.thread_pool是否存在,另外在异常处理阶段有出BUG.
我在这个版本已经修复,已经热心肠的给作者发了pull requests
ThreadPoo是预先Prefork的,是根据队列的大小来增加减少. 这样带来的问题是不断的new thread,带来不必要的系统开销.
正在写,默认给他30个线程池
thread_pool做了相当于celery那种任务推送,由线程组去消费, 而没有做到把当前用户的操作同步化,不能合理的拿到结果.
正在写,在保留任务异步化的基础上,加入同步非堵塞逻辑
example.py 是我写的一个tornado thread_pool的使用实例,可以方便测试线程池的功能.
线程池提供了三个装饰器,下面的代码注释是老外提供的,我建议你们直接测试下我提供的example.py,一目了然了.
#blog: xiaorui.cc from thread_pool import in_thread_pool, in_ioloop, blocking @blocking def get_thing_from_database(): # If this method is not called from the thread pool, # it will result in a warning. return db.get('thing') @in_thread_pool def blocking_method(callback): # Call some blocking api, like a database driver. # When called, it will always return immediately, # and do its work at some future time in a thread pool. callback(get_thing_from_database()) @in_ioloop def non_blocking_method(callback, data): # Call some non-blocking api, like AsyncHTTPClient. # Guarunteed to run in a tornado IOLoop. #blog: xiaorui.cc
其实借用这种多线程模式,我们也可以改成多进程模式. 那其实实现起来也颇为简单,使用multiprocessing queue队列,确保多进程之间可以共享数据.
过两天我会看下tornado的ThreadPoolExecutor线程池的源码,到时候会写个文档分享下.