使用multiprocessing pool的某参数实现资源回收

饭饱之后,继续研究multiprocessing的设计实现。建议大家抽时间都看下常用的库的源码,常常会有意外的收获。  比如我这边发现multiprocessing Pool的构造函数有个叫maxtasksperchild参数。 我查了文档,又对应了代码,终于整明白是啥个意思。

该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。

http://xiaorui.cc/?p=2901


一个python pool进程池的例子…

import multiprocessing
pool = multiprocessing.Pool(processes=4,maxtasksperchild=2)
res.append(pool.apply_async(generate_missing_id, (scan_start,scan_end),))
pool.close
pool.join()

maxtasksperchild本意是每个进程最大的任务量,如果你maxtasksperchild = 2, 那么他每次干完两个任务后,就会spawn一个新的进程。

可以防止某个进程内存泄露被oom,这样可以通过原始kill进程的方式回收内存资源。

下面我们看下multiprocessing pool maxtasksperchild的相关实现源码. 

#xiaorui.cc

class Pool(object):
    Process = Process
    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
                 # initializer 是初始化函数
                 
def _repopulate_pool(self):
    for i in range(self._processes - len(self._pool)):
        w = self.Process(target=worker,
                         args=(self._inqueue, self._outqueue,
                               self._initializer,
                               self._initargs, self._maxtasksperchild)
                        )
        self._pool.append(w)
        w.name = w.name.replace('Process', 'PoolWorker')
        w.daemon = True
        w.start()
        debug('added worker')

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
    completed = 0

    # 每当任务大于等于我们配置的maxtasksperchild,该进程就退出去,由Process再spawn一个新子进程。 

    while maxtasks is None or (maxtasks and completed < maxtasks): 
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break
        completed += 1

测试的python代码我就不写了, 下面我们看结果multiprocessing pool maxtasksperchild的结果.  每隔一段时间除了主进程的pid不变,其他由主进程spawn的子进程在不停的变换pid。

#xiaorui.cc

[xiaorui.cc@wx-buzz-admin checkmysql]$ ps aux|grep check_mi
504       6984  1.0  0.0 146696  4780 pts/38   S+   15:33   0:00 vim check_migrage.py
504       7042 16.5  0.0 454096 19892 pts/0    Sl+  15:34   0:00 python check_migrage.py
504       7048 21.5  0.0 337080 46692 pts/0    S+   15:34   0:00 python check_migrage.py
504       7049 22.5  0.1 366628 75880 pts/0    S+   15:34   0:00 python check_migrage.py
504       7050 12.0  0.0 372884 55512 pts/0    R+   15:34   0:00 python check_migrage.py
504       7051 14.5  0.0 337672 47104 pts/0    S+   15:34   0:00 python check_migrage.py
504       7061  0.0  0.0 103260   868 pts/41   S+   15:34   0:00 grep check_mi
[xiaorui.cc@wx-buzz-admin checkmysql]$
[xiaorui.cc@wx-buzz-admin checkmysql]$
[xiaorui.cc@wx-buzz-admin checkmysql]$
[xiaorui.cc@wx-buzz-admin checkmysql]$ ps aux|grep check_mi
504       6984  0.0  0.0 146696  4780 pts/38   S+   15:33   0:00 vim check_migrage.py
504       7042  0.1  0.0 454096 19940 pts/0    Sl+  15:34   0:01 python check_migrage.py
504       8056 22.3  0.1 491108 76252 pts/0    S+   15:44   1:02 python check_migrage.py
504       8237 27.1  0.0 474724 49444 pts/0    S+   15:46   0:47 python check_migrage.py
504       8246 21.0  0.0 474724 50184 pts/0    S+   15:46   0:33 python check_migrage.py
504       8529 22.0  0.0 474724 49228 pts/0    S+   15:48   0:03 python check_migrage.py
504       8558  0.0  0.0 103260   868 pts/41   S+   15:49   0:00 grep check_mi
[xiaorui.cc@wx-buzz-admin checkmysql]$ ps aux|grep check_mi
504       6984  0.0  0.0 146696  4780 pts/38   S+   15:33   0:00 vim check_migrage.py
504       7042  0.1  0.0 454096 19944 pts/0    Sl+  15:34   0:01 python check_migrage.py
504       8056 23.9  0.1 491108 76096 pts/0    R+   15:44   1:59 python check_migrage.py
504       8237 15.9  0.0 458336 33080 pts/0    S+   15:46   1:02 python check_migrage.py
504       8246 18.4  0.0 458336 33820 pts/0    S+   15:46   1:09 python check_migrage.py
504       8529 25.8  0.0 458336 32860 pts/0    S+   15:48   1:00 python check_migrage.py

以前写过一个超级蛋疼的业务程序。说多了都是泪,因为程序连接了不同的cursor数据并导入到有序字典里,所以造成了内存越来越大。 专业点的说法是内存泄露。总之不能资源回收了。  后来搞了一套类似Master Worker的进程管理模块才搞定。 如果有人问我 为什么不直接解决内存泄露及回收资源的bug, 我…. 无话可说…   老代码,老坑.

如果早知道multiprocessing有maxtasksperchild的实现,就不用这么麻烦了。

END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

电子邮件地址不会被公开。 必填项已用*标注