对于有些人来说Gevent和multiprocessing组合在一起使用算是个又高大上又奇葩的工作模式.
Python的多线程受制于GIL全局锁的特性,Gevent身为协程也是线程的一种,只是io调度上自己说了算而已。
那么如何使用多个cpu核心? 可以利用多进程mutliprocessing来进行多核并行工作,在多进程里面使用gevent协程框架可以更好的做io调度,相比线程来说减少了无谓的上下文切换.
废话少说,直接上个例子. 下面是多进程下生产者消费者的工作模式,代码本身很简单,自己跑一下就知道怎么一回事了.
#blog: xiaorui.cc
from multiprocessing import Process, cpu_count, Queue, JoinableQueue
from gevent import monkey; monkey.patch_all();
import gevent
import datetime
from Queue import Empty
class Consumer(object):
def __init__(self, q, no_tasks, name):
self._no_tasks = no_tasks
self._queue = q
self.name = name
self._rungevent(self._queue, self._no_tasks)
def _rungevent(self, q, no_tasks):
jobs = [gevent.spawn(self._printq) for x in xrange(no_tasks)]
gevent.joinall(jobs)
def _printq(self):
while 1:
value = self._queue.get()
if value is None:
self._queue.task_done()
break
else:
print("{0} time: {1}, value: {2}".format(self.name,\
datetime.datetime.now(), value))
return
class Producer(object):
def __init__(self, q, no_tasks, name, consumers_tasks):
print(name)
self._q = q
self._no_tasks = no_tasks
self.name = name
self.consumer_tasks = consumers_tasks
self._rungevent()
def _rungevent(self):
jobs = [gevent.spawn(self.produce) for x in xrange(self._no_tasks)]
gevent.joinall(jobs)
for x in xrange(self.consumer_tasks):
self._q.put_nowait(None)
self._q.close()
def produce(self):
for no in xrange(10000):
print no
self._q.put(no, block = False)
return
def main():
total_cores = cpu_count()
total_processes = total_cores * 2
q = JoinableQueue()
print("Gevent on top multiprocessing with 17 gevent coroutines\
\n 10 producers gevent and 7 consumers gevent")
producer_gevents = 10
consumer_gevents = 7
jobs = []
start = datetime.datetime.now()
for x in xrange(total_cores):
if not x % 2 :
p = Process(target = Producer, args=(q, producer_gevents,\
"producer %d"%1, consumer_gevents))
p.start()
jobs.append(p)
else:
p = Process(target = Consumer, args=(q, consumer_gevents,\
"consumer %d"%x))
p.start()
jobs.append(p)
for job in jobs:
job.join()
print("{0} process with {1} producer gevents and {2} consumer gevents took{3}\
seconds to produce {4} numbers and consume".format(total_processes,\
producer_gevents * total_cores, consumer_gevents * total_cores, \
datetime.datetime.now() - start,producer_gevents*total_cores*10000))
if __name__ == '__main__':
main()

你好,我最近在调试一段gevent的代码,于是找到你的blog。我先跑了一下你的代码,我发现如果在consumer中,加入一些长时间的任务,比如这样:““ def _printq(self): while 1: value = self._queue.get() if value == 500: print ‘value is %d’ % value gevent.sleep(5) if value is None: self._queue.task_done() break else: print(“{0} time: {1}, value: {2}”.format(self.name, datetime.datetime.now(), value)) return““必然的,在task的value为500的时候,整个程序就挂起了。有什么办法让这些长时间的任务不阻塞我的主程序呢?