饭饱之后,继续研究multiprocessing的设计实现。建议大家抽时间都看下常用的库的源码,常常会有意外的收获。 比如我这边发现multiprocessing Pool的构造函数有个叫maxtasksperchild参数。 我查了文档,又对应了代码,终于整明白是啥个意思。
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。
一个python pool进程池的例子…
import multiprocessing pool = multiprocessing.Pool(processes=4,maxtasksperchild=2) res.append(pool.apply_async(generate_missing_id, (scan_start,scan_end),)) pool.close pool.join()
maxtasksperchild本意是每个进程最大的任务量,如果你maxtasksperchild = 2, 那么他每次干完两个任务后,就会spawn一个新的进程。
可以防止某个进程内存泄露被oom,这样可以通过原始kill进程的方式回收内存资源。
下面我们看下multiprocessing pool maxtasksperchild的相关实现源码.
#xiaorui.cc class Pool(object): Process = Process def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None): # initializer 是初始化函数 def _repopulate_pool(self): for i in range(self._processes - len(self._pool)): w = self.Process(target=worker, args=(self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild) ) self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() debug('added worker') def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) completed = 0 # 每当任务大于等于我们配置的maxtasksperchild,该进程就退出去,由Process再spawn一个新子进程。 while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break completed += 1
测试的python代码我就不写了, 下面我们看结果multiprocessing pool maxtasksperchild的结果. 每隔一段时间除了主进程的pid不变,其他由主进程spawn的子进程在不停的变换pid。
#xiaorui.cc [xiaorui.cc@wx-buzz-admin checkmysql]ps aux|grep check_mi 504 6984 1.0 0.0 146696 4780 pts/38 S+ 15:33 0:00 vim check_migrage.py 504 7042 16.5 0.0 454096 19892 pts/0 Sl+ 15:34 0:00 python check_migrage.py 504 7048 21.5 0.0 337080 46692 pts/0 S+ 15:34 0:00 python check_migrage.py 504 7049 22.5 0.1 366628 75880 pts/0 S+ 15:34 0:00 python check_migrage.py 504 7050 12.0 0.0 372884 55512 pts/0 R+ 15:34 0:00 python check_migrage.py 504 7051 14.5 0.0 337672 47104 pts/0 S+ 15:34 0:00 python check_migrage.py 504 7061 0.0 0.0 103260 868 pts/41 S+ 15:34 0:00 grep check_mi [xiaorui.cc@wx-buzz-admin checkmysql] [xiaorui.cc@wx-buzz-admin checkmysql][xiaorui.cc@wx-buzz-admin checkmysql] [xiaorui.cc@wx-buzz-admin checkmysql]ps aux|grep check_mi 504 6984 0.0 0.0 146696 4780 pts/38 S+ 15:33 0:00 vim check_migrage.py 504 7042 0.1 0.0 454096 19940 pts/0 Sl+ 15:34 0:01 python check_migrage.py 504 8056 22.3 0.1 491108 76252 pts/0 S+ 15:44 1:02 python check_migrage.py 504 8237 27.1 0.0 474724 49444 pts/0 S+ 15:46 0:47 python check_migrage.py 504 8246 21.0 0.0 474724 50184 pts/0 S+ 15:46 0:33 python check_migrage.py 504 8529 22.0 0.0 474724 49228 pts/0 S+ 15:48 0:03 python check_migrage.py 504 8558 0.0 0.0 103260 868 pts/41 S+ 15:49 0:00 grep check_mi [xiaorui.cc@wx-buzz-admin checkmysql] ps aux|grep check_mi 504 6984 0.0 0.0 146696 4780 pts/38 S+ 15:33 0:00 vim check_migrage.py 504 7042 0.1 0.0 454096 19944 pts/0 Sl+ 15:34 0:01 python check_migrage.py 504 8056 23.9 0.1 491108 76096 pts/0 R+ 15:44 1:59 python check_migrage.py 504 8237 15.9 0.0 458336 33080 pts/0 S+ 15:46 1:02 python check_migrage.py 504 8246 18.4 0.0 458336 33820 pts/0 S+ 15:46 1:09 python check_migrage.py 504 8529 25.8 0.0 458336 32860 pts/0 S+ 15:48 1:00 python check_migrage.py
以前写过一个超级蛋疼的业务程序。说多了都是泪,因为程序连接了不同的cursor数据并导入到有序字典里,所以造成了内存越来越大。 专业点的说法是内存泄露。总之不能资源回收了。 后来搞了一套类似Master Worker的进程管理模块才搞定。 如果有人问我 为什么不直接解决内存泄露及回收资源的bug, 我…. 无话可说… 老代码,老坑.
如果早知道multiprocessing有maxtasksperchild的实现,就不用这么麻烦了。
END.