起因,我用多线程去用访问elasticsearch api时候,想拿到es返回的search结果。 默认python threading是不能获取返回的结果的。
有这么几种方式可以取到多线程执行后的结果 .
1. 使用Queue队列的方式.
2. 使用共享变量的问题
文章的笔误太多,想修改的时候,会发现文章不知不觉被转载到各个论坛了。 我这里标注下原文地址。 http://xiaorui.cc/2015/11/03/%E7%90%86%E8%A7%A3python%E7%9A%84multiprocessing-pool-threadpool%E5%A4%9A%E7%BA%BF%E7%A8%8B/
我们知道multiprocessing Process是可以返回每个调用的结果,在multiprocessing下也有个多线程模块,通过async_result.get()可以获取结果。好奇ThreadPool是如何实现的?或者说multiprocessing是如何实现的多线程.
python有两个多线程的入口,一个是 dummy Pool 另一个是pool.ThreadPool
from multiprocessing.pool import ThreadPool def foo(bar, baz): print 'hello {0}'.format(bar) return 'foo' + baz pool = ThreadPool(processes=1) async_result = pool.apply_async(foo, ('xiaorui.cc', 'foo',))
#使用async_result.get可以取出结果.
return_val = async_result.get()
下面我们来看看
file:2.7/lib/python2.7/multiprocessing/pool.py
# blog: http://xiaorui.cc class ThreadPool(Pool): from .dummy import Process def __init__(self, processes=None, initializer=None, initargs=()): Pool.__init__(self, processes, initializer, initargs) def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin ''' assert self._state == RUN result = ApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result #取结果 class ApplyResult(object): def __init__(self, cache, callback): self._cond = threading.Condition(threading.Lock()) self._job = job_counter.next() self._cache = cache self._ready = False self._callback = callback cache[self._job] = self def get(self, timeout=None): self.wait(timeout) if not self._ready: raise TimeoutError if self._success: return self._value else: raise self._value #实例化Pool类的时候,就会由一个线程start下面的函数,他的主要任务是写入结果 class Pool(): self._result_handler = threading.Thread( target=Pool._handle_results, args=(self._outqueue, self._quick_get, self._cache) ) def _handle_results(outqueue, get, cache): thread = threading.current_thread() while 1: try: task = get() except (IOError, EOFError): debug('result handler got EOFError/IOError -- exiting') return if thread._state: assert thread._state == TERMINATE debug('result handler found thread._state=TERMINATE') break if task is None: debug('result handler got sentinel') break job, i, obj = task try: cache[job]._set(i, obj) except KeyError: pass while cache and thread._state != TERMINATE: try: task = get() except (IOError, EOFError): debug('result handler got EOFError/IOError -- exiting') return if task is None: debug('result handler ignoring extra sentinel') continue job, i, obj = task try: cache[job]._set(i, obj) except KeyError: pass
file:2.7/lib/python2.7/multiprocessing/dummy/__init__.py
Process = DummyProcess class DummyProcess(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): threading.Thread.__init__(self, group, target, name, args, kwargs) self._pid = None self._children = weakref.WeakKeyDictionary() self._start_called = False self._parent = current_process()
python的multiprocessing模块是处理python多进程的模块,multiprocessing模块中有个dummy的子模块。multiprocessing.dummy对threading多线程编程进行了包装。 话说关于multiprocessing.pool Threading代码看起来不是很流畅。实例化pool的时候,创建你指定的进程数目,或者是cpu的核数。 然后把任务都堆积到task_queue有多个进程去处理。另外我们可以通过ApplyResult的对象获取任务状态及结果。
END.