理解python的multiprocessing.pool threadpool多线程

起因,我用多线程去用访问elasticsearch api时候,想拿到es返回的search结果。 默认python threading是不能获取返回的结果的。
有这么几种方式可以取到多线程执行后的结果 .  
1. 使用Queue队列的方式.
2. 使用共享变量的问题

文章的笔误太多,想修改的时候,会发现文章不知不觉被转载到各个论坛了。 我这里标注下原文地址。  http://xiaorui.cc/2015/11/03/%E7%90%86%E8%A7%A3python%E7%9A%84multiprocessing-pool-threadpool%E5%A4%9A%E7%BA%BF%E7%A8%8B/


我们知道multiprocessing Process是可以返回每个调用的结果,在multiprocessing下也有个多线程模块,通过async_result.get()可以获取结果。好奇ThreadPool是如何实现的?或者说multiprocessing是如何实现的多线程. 

python有两个多线程的入口,一个是 dummy Pool  另一个是pool.ThreadPool

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('xiaorui.cc', 'foo',))

#使用async_result.get可以取出结果. 
return_val = async_result.get()

下面我们来看看

file:2.7/lib/python2.7/multiprocessing/pool.py

# blog:  http://xiaorui.cc
class ThreadPool(Pool):

    from .dummy import Process

    def __init__(self, processes=None, initializer=None, initargs=()):
        Pool.__init__(self, processes, initializer, initargs)

def apply_async(self, func, args=(), kwds={}, callback=None):
    '''
    Asynchronous equivalent of `apply()` builtin
    '''
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

#取结果
class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self
    def get(self, timeout=None):
        self.wait(timeout)
        if not self._ready:
            raise TimeoutError
        if self._success:
            return self._value
        else:
            raise self._value



#实例化Pool类的时候,就会由一个线程start下面的函数,他的主要任务是写入结果
class Pool():
self._result_handler = threading.Thread(
    target=Pool._handle_results,
    args=(self._outqueue, self._quick_get, self._cache)
    )
def _handle_results(outqueue, get, cache):
   thread = threading.current_thread()

   while 1:
       try:
           task = get()
       except (IOError, EOFError):
           debug('result handler got EOFError/IOError -- exiting')
           return

       if thread._state:
           assert thread._state == TERMINATE
           debug('result handler found thread._state=TERMINATE')
           break

       if task is None:
           debug('result handler got sentinel')
           break

       job, i, obj = task
       try:
           cache[job]._set(i, obj)
       except KeyError:
           pass

   while cache and thread._state != TERMINATE:
       try:
           task = get()
       except (IOError, EOFError):
           debug('result handler got EOFError/IOError -- exiting')
           return

       if task is None:
           debug('result handler ignoring extra sentinel')
           continue
       job, i, obj = task
       try:
           cache[job]._set(i, obj)
       except KeyError:
           pass

file:2.7/lib/python2.7/multiprocessing/dummy/__init__.py

Process = DummyProcess
class DummyProcess(threading.Thread):

    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        threading.Thread.__init__(self, group, target, name, args, kwargs)
        self._pid = None
        self._children = weakref.WeakKeyDictionary()
        self._start_called = False
        self._parent = current_process()


python的multiprocessing模块是处理python多进程的模块,multiprocessing模块中有个dummy的子模块。multiprocessing.dummy对threading多线程编程进行了包装。  话说关于multiprocessing.pool Threading代码看起来不是很流畅。实例化pool的时候,创建你指定的进程数目,或者是cpu的核数。 然后把任务都堆积到task_queue有多个进程去处理。另外我们可以通过ApplyResult的对象获取任务状态及结果。 

END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注