python下multiprocessing和gevent的组合使用


对于有些人来说Gevent和multiprocessing组合在一起使用算是个又高大上又奇葩的工作模式.  

Python的多线程受制于GIL全局锁的特性,Gevent身为协程也是线程的一种,只是io调度上自己说了算而已。 

那么如何使用多个cpu核心? 可以利用多进程mutliprocessing来进行多核并行工作,在多进程里面使用gevent协程框架可以更好的做io调度,相比线程来说减少了无谓的上下文切换.  


废话少说,直接上个例子.  下面是多进程下生产者消费者的工作模式,代码本身很简单,自己跑一下就知道怎么一回事了. 

#blog: xiaorui.cc

from multiprocessing import Process, cpu_count, Queue, JoinableQueue
from gevent import monkey; monkey.patch_all();
import gevent
import datetime
from Queue import Empty

class Consumer(object):
    def __init__(self, q, no_tasks, name):
        self._no_tasks = no_tasks
        self._queue = q
        self.name = name
        self._rungevent(self._queue, self._no_tasks)

    def _rungevent(self, q, no_tasks):
        jobs = [gevent.spawn(self._printq) for x in xrange(no_tasks)]
        gevent.joinall(jobs)

    def _printq(self):
        while 1:
            value = self._queue.get()
            if value is None:
                self._queue.task_done()
                break
            else:
                print("{0} time: {1}, value: {2}".format(self.name,\
                                 datetime.datetime.now(), value))
        return 

class Producer(object):
    def __init__(self, q, no_tasks, name, consumers_tasks):
       print(name)
       self._q = q
       self._no_tasks = no_tasks
       self.name = name
       self.consumer_tasks = consumers_tasks
       self._rungevent()

    def _rungevent(self):
        jobs = [gevent.spawn(self.produce) for x in xrange(self._no_tasks)]
        gevent.joinall(jobs)
        for x in xrange(self.consumer_tasks):
            self._q.put_nowait(None)
        self._q.close()

    def produce(self):
        for no in xrange(10000):
            print no
            self._q.put(no, block = False)
        return 

def main():
    total_cores = cpu_count()
    total_processes = total_cores * 2
    q = JoinableQueue()
    print("Gevent on top multiprocessing with 17 gevent coroutines\
          \n 10 producers gevent and 7 consumers gevent")
    producer_gevents = 10
    consumer_gevents = 7
    jobs = []
    start = datetime.datetime.now()
    for x in xrange(total_cores):
        if not x % 2 :
            p = Process(target = Producer, args=(q, producer_gevents,\
                                            "producer %d"%1, consumer_gevents))
            p.start()
            jobs.append(p)
        else:
            p = Process(target = Consumer, args=(q, consumer_gevents,\
                                                      "consumer %d"%x))
            p.start()
            jobs.append(p)

    for job in jobs:
        job.join()

    print("{0} process with {1} producer gevents and {2} consumer gevents took{3}\
           seconds to produce {4} numbers and consume".format(total_processes,\
           producer_gevents * total_cores, consumer_gevents * total_cores, \
           datetime.datetime.now() - start,producer_gevents*total_cores*10000))

if __name__ == '__main__':
    main()


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

1 Response

  1. 温云龙(Andes) 2016年10月24日 / 下午5:31

    你好,我最近在调试一段gevent的代码,于是找到你的blog。我先跑了一下你的代码,我发现如果在consumer中,加入一些长时间的任务,比如这样:““ def _printq(self): while 1: value = self._queue.get() if value == 500: print ‘value is %d’ % value gevent.sleep(5) if value is None: self._queue.task_done() break else: print(“{0} time: {1}, value: {2}”.format(self.name, datetime.datetime.now(), value)) return““必然的,在task的value为500的时候,整个程序就挂起了。有什么办法让这些长时间的任务不阻塞我的主程序呢?

发表评论

电子邮件地址不会被公开。 必填项已用*标注