python multiprocessing之间的通信性能测试 Pipe vs Queue

今天12306的数据库爆出来了,午饭过后,自己搞了个针对12306密码查询的小api,结果影响有些大,VPS都挂了好几次 !  搞得哥们很是销魂呀 !  后来又听说,搞不好进去,尼玛    吓尿了 ! 

老规矩,标记下这篇文章的原文地址, http://xiaorui.cc

正题,下午看了国外的帖子,关于pipe和queue的实现,尤其是性能的对比啥的,有些感受 !  半夜了,睡不着觉,撸管又没好片,那就写点心得吧 ! 

老外帖子的地址是,http://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue 

关于,Pipe() 只能是两个客户端

关于,Queue() 可以有多个生产者和消费者

那么我们如何的选择他们?

如果你的环境,是多生产者和消费者,那么你只能是选择queue队列

如果两个进程间处理的逻辑简单,但是就是要求绝对的速度,那么pipe是个好选择 !

首先说明下pipe管道的场景,他的两个点的场景有些少,但是他的性能是觉得高 ! 反而在1000000级别的时候,支持多点的queue的性能就下去了,如果单纯的看性能,pipe要完爆queue。  当然,换个场景的话,pipe直接玩不转。 


[xiaorui@devops ~ ]python pipe.py
Sending 10000 numbers to Pipe() took 0.0436382293701 seconds
Sending 100000 numbers to Pipe() took 0.40510392189 seconds
Sending 1000000 numbers to Pipe() took 4.67704510689 seconds
[xiaorui@devops ~ ]
[xiaorui@devops ~ ]vim queue.py
[xiaorui@devops ~ ] python queue.py
Sending 10000 numbers to Queue() took 0.216951131821 seconds
Sending 100000 numbers to Queue() took 2.06925415993 seconds
Sending 1000000 numbers to Queue() took 21.7990300655 seconds
[xiaorui@devops ~ ][xiaorui@devops ~ ] python pipe.py
Sending 10000 numbers to Pipe() took 0.040070772171 seconds
Sending 100000 numbers to Pipe() took 0.44092297554 seconds
Sending 1000000 numbers to Pipe() took 3.8376250267 seconds
[xiaorui@devops ~ ][xiaorui@devops ~ ] python queue.py
Sending 10000 numbers to Queue() took 0.219097852707 seconds
Sending 100000 numbers to Queue() took 2.10510516167 seconds
Sending 1000000 numbers to Queue() took 23.5568928719 seconds
[xiaorui@devops ~ ]$
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader(pipe):
    output_p, input_p = pipe
    input_p.close()    # We are only reading
    while True:
        try:
            msg = output_p.recv()    # 一直做接收
        except EOFError:
            break

def writer(count, input_p):
    for ii in xrange(0, count):
        input_p.send(ii)             # Write 'count' numbers into the input pipe

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        output_p, input_p = Pipe()
        reader_p = Process(target=reader, args=((output_p, input_p),))
        reader_p.start()     # Launch the reader process

        output_p.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, input_p) # 疯狂的写入
        input_p.close()        # 写入了后,就开始close
        reader_p.join()
        print "Sending %s numbers to Pipe() took %s seconds" % (count, 
            (time.time() - _start))

"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time

def reader(queue):
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        queue = Queue()   # reader() reads from queue
                          # writer() writes to queue
        reader_p = Process(target=reader, args=((queue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        _start = time.time()
        writer(count, queue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print "Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader(queue):
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        queue = JoinableQueue()   # reader() reads from queue
                                  # writer() writes to queue
        reader_p = Process(target=reader, args=((queue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        _start = time.time()
        writer(count, queue) # Send a lot of stuff to reader()
        queue.join()         # Wait for the reader to finish
        print "Sending %s numbers to JoinableQueue() took %s seconds" % (count, 
            (time.time() - _start))

另外,线程中常用的queue.queue() 和 multiprocessing 的queue的区别是很大的:


multiprocessing queues exchange data by pickling (serializing) objects and sending them through pipes.


Queue.Queue uses a data structure that is shared between threads and locks/mutexes for correct behaviour.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

2 Responses

  1. speng 2016年9月7日 / 下午9:21

    ⊙﹏⊙,我这笔记本测试10**6, 要12秒, 呀!!

  2. 自由人 2014年12月30日 / 上午9:18

    pipe速度确实快,你可以试试mmap

发表评论

邮箱地址不会被公开。 必填项已用*标注