遇到一个坑,Can’t pickle instancemethod …
前言:
今天写了一个小脚本,因为涉及到cpu运算的事件,所以用了多进程. 因为大量复用了以前的类,就遇到了奇怪的问题。 我这里就不暴露我的业务代码,临时写了个小demo供大家测试下。
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。
#blog: xiaorui.cc import time import multiprocessing class Go(): def run(self,args): print 'end' print args pool = multiprocessing.Pool(processes=1) pool.apply_async(Go().run,(60,)) pool.close() pool.join()
然后你会遇到下面的报错信息: >
Exception in thread Thread-2:
Traceback (most recent call last):
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
self.run()
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 763, in run
self.__target(*self.__args, **self.__kwargs)
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py”, line 342, in _handle_tasks
put(task)
PicklingError: Can’t pickle <type ‘instancemethod’>: attribute lookup __builtin__.instancemethod failed
我们看下multiprocessing pool.py的源码,纠结下哪里出的问题.
#blog: xiaorui.cc #入口函数. def apply_async(self, func, args=(), kwds={}, callback=None): assert self._state == RUN result = ApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): put = outqueue.put #put函数 @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool): for taskseq, set_length in iter(taskqueue.get, None): i = -1 for i, task in enumerate(taskseq): if thread._state: debug('task handler found thread._state != RUN') break try: put(task) #这里报错.... except IOError: debug('could not put task on queue')
PicklingError: Can’t pickle <type ‘instancemethod’>: attribute lookup __builtin__.instancemethod failed 这个报错给出的信息很明显… 不知道这算是multiprocessing的坑,还是pickle的坑. 看了些stackoverflow回复,貌似在python3.4解决了这类问题.
我们首先要明白,这问题是怎么引起的?
python的multiprocessing pool进程池隐形的加入了一个任务队列,在你apply_async的时候,他会使用pickle序列化对象,但是python 2.x的pickle应该是不支持这种模式的序列化.
解决的方法有这么几种: >
第一种:
把执行的函数放在外面,这样就避免了把类的实例序列化. 如果不能直接放在外面的化,可以再用一个的函数来包装下.
第二种:
不使用进程池,而使用Process函数解决fork进程,这样也避免了pickle序列化对象.
#blog: xiaorui.cc import multiprocessing p = multiprocessing.Process(target=Go().run, args=(1, )) p.start() p.join()
第三种:
使用copy_reg将MethodType注册为可序列化的方法
#blog: xiaorui.cc import copy_reg import types def _pickle_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) copy_reg.pickle(types.MethodType, _pickle_method)
第四种,加入一个中间函数,使用getattr自省黑魔法调用真正的函数.
#blog: xiaorui.cc def proxy(instance, *args): getattr(instance(),'run')('xiaorui.cc')
END…