多进程下gevent遇到管道冲突问题

前言:

在我的Python技术群里,有朋友问我一个gevent的问题,在multiprocessing 使用 gevent的时候,为什么会出现管道卡死的情况?  

一听到这些字眼,我当时迅速回答出答案, 是由于 坑爹的 monkey.patch_all() 补丁引起的..   一般来说,盲目使用monkey.patch_all()的人,可以说对gevent的实现原理很匮乏….

该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。http://xiaorui.cc/?p=4710

别人给我的样例代码有些长,下面是精简后的代码:

# xiaorui.cc
# -*- coding: utf-8 -*-

import multiprocessing

import gevent
from gevent.pool import Pool
from gevent import monkey, sleep
monkey.patch_all()

def report_task(task):
    jobs = []
    for i in range(10):
        jobs.append(gevent.spawn(krun, task))
    gevent.joinall(jobs)

def krun(task):
    while 1:
        gevent.sleep(1)

if __name__ == "__main__":
    plist = []
    pool = multiprocessing.Pool(processes=4)
    for i in range(0,2):
        pool.apply_async(report_task, (i, ))

    pool.close()
    pool.join()

遇到看似假死的问题,我们可以使用strace追踪进程的syscall情况,结果如下.

rocess 28952 attached - interrupt to quit
read(6,
ll /proc/28952/fd
lrwx------ 1 root root 64 8月   3 21:40 0 -> /dev/pts/7
lrwx------ 1 root root 64 8月   3 21:40 1 -> /dev/pts/7
l-wx------ 1 root root 64 8月   3 21:40 10 -> pipe:[82656015]
lrwx------ 1 root root 64 8月   3 21:40 2 -> /dev/pts/7
lr-x------ 1 root root 64 8月   3 21:40 3 -> pipe:[82656009]
l-wx------ 1 root root 64 8月   3 21:40 4 -> pipe:[82656009]
lr-x------ 1 root root 64 8月   3 21:40 5 -> /dev/urandom
lr-x------ 1 root root 64 8月   3 21:40 6 -> pipe:[82656012]
l-wx------ 1 root root 64 8月   3 21:40 7 -> pipe:[82656012]
lrwx------ 1 root root 64 8月   3 21:40 8 -> [eventpoll]
lr-x------ 1 root root 64 8月   3 21:40 9 -> pipe:[82656015]

出现的问题:

我们会发现程序运行了,但是没有按照我们预想的逻辑运行。 我们发现 multiprocessing 跟gevent的组合 阻塞到 pipe管道上。

通过对一些库包的理解,我们可以得出使用pipe逻辑的模块有multiprocessing Pool 和 multiprocessing Join. 

 multiprocessing Pool使用了multiprocessing Queue, Queue里面有用到pipe,

multiprocessing Queue

multiprocessing queue 没有使用Value那种mmap进程空间。 而是很巧妙的实现了数据的共享。 cpython启动的时候,会在main thread实例化deque,另外还创建了一对读写pipe,生产者和消费着就是靠着pipe来缓冲传输数据。除了这些之外,他也有 Lock,condition的。使用lock来保证数据一致性,cond来通知生产者和消费者来put,或者get收。 

multiprocessing Join

另外再说pool join 有两种实现,一个是忙轮训,py 源码已经写死了,500us 轮一次,每次去trylock。还有一种是基于锁的,这个可以规避忙查询,让内核帮你做事件的通知。子线程执行完毕后,会主动释放锁,main  thread 自然也就拿到锁了。 

Gevent背靠epoll 封装的libev 的事件器,libev 只支持 fd ,时间,信号的就绪通知。不支持锁什么的就绪通知,那如何实现异步非阻塞? 如何就绪通知syscall 已经调用! gevent patch ! 把一些使用库包阻塞的方法都用补丁替换掉。

线程是用锁来等待通知,那么我就用别的方法去等待通知,比如pipe 管道。。。libev 是支持pipe 的。

根据断点排除,问题是出在 multiprocessing pool上。

再说gevent thread patch,  这线程的patch在社区中反应很平淡,不仅仅是平淡,不少人都觉得这个patch是个鸡肋。。。 先是没多少人在用,另外它还会引起这种小问题。。。
加入thread patch之后,multiprocessing也是受影响的。gevent在1.2之前,gevent给自己的定位是 封装所有的并发接口。。。  但,明显不合理。
当你在gevnet 加入thread patch时,gevent会帮你加入pipe的相关patch.
至于为什么会出现 假死,程序一直挂在read pipe上,不运行…   只是因为thread patch设计的问题… 

解决方法,有两个:

第一个:
用什么patch,就引入啥patch。

第二个方法,
monkey.patch_all(thread=False) 

第三个方法,

调整下import gevent位置,进程fork之后再惰性加载gevent相关方法,这样不污染主线程的环境。

END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc