起因,我用多线程去用访问elasticsearch api时候,想拿到es返回的search结果。 默认python threading是不能获取返回的结果的。
有这么几种方式可以取到多线程执行后的结果 .
1. 使用Queue队列的方式.
2. 使用共享变量的问题
文章的笔误太多,想修改的时候,会发现文章不知不觉被转载到各个论坛了。 我这里标注下原文地址。 http://xiaorui.cc/2015/11/03/%E7%90%86%E8%A7%A3python%E7%9A%84multiprocessing-pool-threadpool%E5%A4%9A%E7%BA%BF%E7%A8%8B/
我们知道multiprocessing Process是可以返回每个调用的结果,在multiprocessing下也有个多线程模块,通过async_result.get()可以获取结果。好奇ThreadPool是如何实现的?或者说multiprocessing是如何实现的多线程.
python有两个多线程的入口,一个是 dummy Pool 另一个是pool.ThreadPool
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('xiaorui.cc', 'foo',))
#使用async_result.get可以取出结果.
return_val = async_result.get()
下面我们来看看
file:2.7/lib/python2.7/multiprocessing/pool.py
# blog: http://xiaorui.cc
class ThreadPool(Pool):
from .dummy import Process
def __init__(self, processes=None, initializer=None, initargs=()):
Pool.__init__(self, processes, initializer, initargs)
def apply_async(self, func, args=(), kwds={}, callback=None):
'''
Asynchronous equivalent of `apply()` builtin
'''
assert self._state == RUN
result = ApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
#取结果
class ApplyResult(object):
def __init__(self, cache, callback):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self._cache = cache
self._ready = False
self._callback = callback
cache[self._job] = self
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
#实例化Pool类的时候,就会由一个线程start下面的函数,他的主要任务是写入结果
class Pool():
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
def _handle_results(outqueue, get, cache):
thread = threading.current_thread()
while 1:
try:
task = get()
except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting')
return
if thread._state:
assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE')
break
if task is None:
debug('result handler got sentinel')
break
job, i, obj = task
try:
cache[job]._set(i, obj)
except KeyError:
pass
while cache and thread._state != TERMINATE:
try:
task = get()
except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting')
return
if task is None:
debug('result handler ignoring extra sentinel')
continue
job, i, obj = task
try:
cache[job]._set(i, obj)
except KeyError:
pass
file:2.7/lib/python2.7/multiprocessing/dummy/__init__.py
Process = DummyProcess
class DummyProcess(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
threading.Thread.__init__(self, group, target, name, args, kwargs)
self._pid = None
self._children = weakref.WeakKeyDictionary()
self._start_called = False
self._parent = current_process()
python的multiprocessing模块是处理python多进程的模块,multiprocessing模块中有个dummy的子模块。multiprocessing.dummy对threading多线程编程进行了包装。 话说关于multiprocessing.pool Threading代码看起来不是很流畅。实例化pool的时候,创建你指定的进程数目,或者是cpu的核数。 然后把任务都堆积到task_queue有多个进程去处理。另外我们可以通过ApplyResult的对象获取任务状态及结果。
END.
