gevent源代码分析之用gevent threadpool实现多进程任务调度

这两天出了两个事故,一个是因为正负面的接口被被人疯狂访问而变得堵塞,导致整个动态网页解析解析的崩溃,最后redis挂掉。 还有一个事情是动态ip轮询模块的bug。。。 多事之秋呀。  这几个晚上一直尝试看gevent的源代码,收获特别的大,gevent本身的一些实现就特别的灵巧,背靠这内核的epoll调度,实现一系列的io调度非堵塞。 

 

大家知道gevent是协程,协程是用户态层面自己解决io堵塞的问题,他会把每次堵塞时间都注册是epoll里面。 那么,gevent是基于单线程的实现,那么gevent本身是没有多进程的,但我们可以每个进程一组线程的方式来实现IO Bound的多进程模式。今天看了下gevent threadpool的相关代码,他的实现有点像multiprocessing threadpool 。 首先我们知道python的线程是个比较坑爹的东西,我现在的服务端架构基本是多进程加多协程的架构。 言归正传,gevent是如何实现多进程?   看了下官方的issue,有人提出用threadpool实现。  简单过了下代码,他是利用os.fork 来派生多进程的,具体的调度我就多说了,主要是在from gevent.hub import get_hub这里。 

gevent的文章,http://xiaorui.cc/?p=1530


下面是我测试的表现,效果还是可以的,计算任务跑的还算平均。 测试是在一个云主机测试的,有些片面性。 


测试的代码如下:

import time
import gevent
from gevent.threadpool import ThreadPool

def sum(i):
    i = 0
    for i in xrange(100000000):
        i += i+1 * 20 / 10 * 10 /10

def test(n,m):
    m=m
    vals = []
    keys = []
    for i in xrange(m):
        vals.append(i)
        keys.append('a%s'%i)
    d = None
    for i in xrange(n):
        d = dict(zip(keys, vals))
    return d

pool = ThreadPool(20)
start = time.time()
for _ in xrange(10):
    pool.spawn(test, 1000000,100)
gevent.wait()
delay = time.time() - start
print 'Running "time.sleep(1)" 4 times with 3 threads. Should take about 2 seconds: %.3fs' % delay

下面是Threadpool 的部分源代码,set_size是创建进程的数目,on_fork是fork进程,kill是干掉进程,spawn是触发进程,add_thread是增加进程。 

def _set_size(self, size):
    if size < 0:
        raise ValueError('Size of the pool cannot be negative: %r' % (size, ))
    if size > self._maxsize:
        raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize))
    if self.manager:
        self.manager.kill()
    while self._size < size:
        self._add_thread()
    delay = 0.0001
    while self._size > size:
        while self._size - size > self.task_queue.unfinished_tasks:
            self.task_queue.put(None)
        if getcurrent() is self.hub:
            break
        sleep(delay)
        delay = min(delay * 2, .05)
    if self._size:
        self.fork_watcher.start(self._on_fork)
    else:
        self.fork_watcher.stop()

size = property(_get_size, _set_size)

def _init(self, maxsize):
    self._size = 0
    self._semaphore = Semaphore(1)
    self._lock = Lock()
    self.task_queue = Queue()
    self._set_maxsize(maxsize)

def _on_fork(self):
    # fork() only leaves one thread; also screws up locks;
    # let's re-create locks and threads
    pid = os.getpid()
    if pid != self.pid:
        self.pid = pid
        # Do not mix fork() and threads; since fork() only copies one thread
        # all objects referenced by other threads has refcount that will never
        # go down to 0.
        self._init(self._maxsize)

def join(self):
    delay = 0.0005
    while self.task_queue.unfinished_tasks > 0:
        sleep(delay)
        delay = min(delay * 2, .05)

def kill(self):
    self.size = 0

def _adjust_step(self):
    # if there is a possibility & necessity for adding a thread, do it
    while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size:
        self._add_thread()
    # while the number of threads is more than maxsize, kill one
    # we do not check what's already in task_queue - it could be all Nones
    while self._size - self._maxsize > self.task_queue.unfinished_tasks:
        self.task_queue.put(None)
    if self._size:
        self.fork_watcher.start(self._on_fork)
    else:
        self.fork_watcher.stop()

def _adjust_wait(self):
    delay = 0.0001
    while True:
        self._adjust_step()
        if self._size <= self._maxsize:
            return
        sleep(delay)
        delay = min(delay * 2, .05)

def adjust(self):
    self._adjust_step()
    if not self.manager and self._size > self._maxsize:
        # might need to feed more Nones into the pool
        self.manager = Greenlet.spawn(self._adjust_wait)

def _add_thread(self):
    with self._lock:
        self._size += 1
    try:
        start_new_thread(self._worker, ())
    except:
        with self._lock:
            self._size -= 1
        raise

def spawn(self, func, *args, **kwargs):
    while True:
        semaphore = self._semaphore
        semaphore.acquire()
        if semaphore is self._semaphore:
            break
    try:
        task_queue = self.task_queue
        result = AsyncResult()
        thread_result = ThreadResult(result, hub=self.hub)
        task_queue.put((func, args, kwargs, thread_result))
        self.adjust()
        # rawlink() must be the last call
        result.rawlink(lambda *args: self._semaphore.release())
        # XXX this _semaphore.release() is competing for order with get()
        # XXX this is not good, just make ThreadResult release the semaphore before doing anything else
    except:
        semaphore.release()
        raise
    return result


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

3 Responses

  1. dongshuiluo@icloud.com 2016年6月3日 / 下午7:58

    gevent threadpool 对CPU BOUND 没有用吧 ? 可以用到多核 ?

    • 峰云就她了 2016年6月4日 / 上午2:43

      他的实现就是os.fork子进程的方式,也就是说可以用多核的。 协程大多数是适用于IO Bound的场景。

  2. 360sa 2015年6月4日 / 上午7:02

    用过这个,只是有些不伦不类

发表评论

邮箱地址不会被公开。 必填项已用*标注