关于tornado ThreadPoolExecutor(threadpool)线程池的实现


腾讯的同事问我关于tornado线程池的事,正好趁着这个机会写一篇关于tornado的threadpool线程池异步工作模式实现.


文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。

http://xiaorui.cc/2016/01/14/%E5%85%B3%E4%BA%8Etornado-threadpoolexecutorthreadpool%E7%BA%BF%E7%A8%8B%E6%B1%A0%E7%9A%84%E5%AE%9E%E7%8E%B0/


首先我们要知道Tornado是个单线程的服务器,另外他作为服务端是没法使用多线程处理并发,其实也没必要使用多线程,tornado 本身就是利用ioloop的异步回调解决io阻塞的问题.   需要注意的是,tornado并不是所有网络模块都是非阻塞的,对于非阻塞的模块,我们需要我们自己在iostream控制 socket层进行事件注册回调. 

如果你的模块就是阻塞的,还就是不想给这模块加patch扩展, 那么你可以用下线程池. 

Tornado的concurrent.futures里有个ThreadPoolExecutor,这就是线程池模块。这模块来自python3.x的线程池模块.

#blog: http://xiaorui.cc

from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
from tornado.concurrent import run_on_executor
 
class SleepHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(10)
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self):
        start = time.time()
        res = yield self.sleep()
        self.write("when i sleep %f s" % (time.time() - start))
        self.finish()

    @run_on_executor
    def sleep(self):
        time.sleep(5)
        return 5

这次先不细聊tornado的ThreadPoolExecutor线程池, 他的构建方法及调用方式跟我下面说的有些不一样的.  那这次说下如何从头来开发tornado的线程池。

说实话真的很巧,对于线程池的具体实现其实我都想好了,但逗比的事我在github上找到了一个老外提交的代码,功能是让io操作都异步化,但这里的异步跟ThreadPoolExecutor不一样.   但这正是我想要的. 

用通俗的话来说他们俩有啥区别:

ThreadPoolExecutor , 可以实现多个人同时等待并获取结果.
thread_pool  , 可以实现多个人推送任务. 他会把任务放到一个任务队列里,然后由线程池去消费. 至于结果,没存.  当然如果你想要结果的话,总的来说很像celery ,rq.

这是我加点料后的代码地址, https://github.com/rfyiamcool/tornado-threadpool

代码说明:

老外的代码在Blocking处理阶段是有问题,首先不需要判断thread_locals.thread_pool是否存在,另外在异常处理阶段有出BUG.

我在这个版本已经修复,已经热心肠的给作者发了pull requests

ThreadPoo是预先Prefork的,是根据队列的大小来增加减少. 这样带来的问题是不断的new thread,带来不必要的系统开销.

正在写,默认给他30个线程池

thread_pool做了相当于celery那种任务推送,由线程组去消费, 而没有做到把当前用户的操作同步化,不能合理的拿到结果.

正在写,在保留任务异步化的基础上,加入同步非堵塞逻辑

example.py 是我写的一个tornado thread_pool的使用实例,可以方便测试线程池的功能.

线程池提供了三个装饰器,下面的代码注释是老外提供的,我建议你们直接测试下我提供的example.py,一目了然了.

#blog: xiaorui.cc

from thread_pool import in_thread_pool, in_ioloop, blocking

@blocking
def get_thing_from_database():
    # If this method is not called from the thread pool,
    # it will result in a warning.
    return db.get('thing')

@in_thread_pool
def blocking_method(callback):
    # Call some blocking api, like a database driver.
    # When called, it will always return immediately,
    # and do its work at some future time in a thread pool.
    callback(get_thing_from_database())

@in_ioloop
def non_blocking_method(callback, data):
    # Call some non-blocking api, like AsyncHTTPClient.
    # Guarunteed to run in a tornado IOLoop.

#blog: xiaorui.cc

其实借用这种多线程模式,我们也可以改成多进程模式.  那其实实现起来也颇为简单,使用multiprocessing queue队列,确保多进程之间可以共享数据.  

过两天我会看下tornado的ThreadPoolExecutor线程池的源码,到时候会写个文档分享下. 



大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注