python multiprocessing遇到Can’t pickle instancemethod问题

遇到一个坑,Can’t pickle instancemethod …

前言:

    今天写了一个小脚本,因为涉及到cpu运算的事件,所以用了多进程.  因为大量复用了以前的类,就遇到了奇怪的问题。 我这里就不暴露我的业务代码,临时写了个小demo供大家测试下。 

文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。

http://xiaorui.cc/2016/01/18/python-multiprocessing%E9%81%87%E5%88%B0cant-pickle-instancemethod%E9%97%AE%E9%A2%98/

#blog: xiaorui.cc

import time
import multiprocessing

class Go():
    def run(self,args):
        print 'end'
        print args

pool = multiprocessing.Pool(processes=1)
pool.apply_async(Go().run,(60,))
pool.close()
pool.join()

然后你会遇到下面的报错信息: >

Exception in thread Thread-2:
Traceback (most recent call last):
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
    self.run()
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py”, line 342, in _handle_tasks
    put(task)
PicklingError: Can’t pickle <type ‘instancemethod’>: attribute lookup __builtin__.instancemethod failed

我们看下multiprocessing pool.py的源码,纠结下哪里出的问题.

#blog: xiaorui.cc

#入口函数.
def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
    put = outqueue.put  #put函数

@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool):
    for taskseq, set_length in iter(taskqueue.get, None):
        i = -1
        for i, task in enumerate(taskseq):
            if thread._state:
                debug('task handler found thread._state != RUN')
                break
            try:
                put(task)        #这里报错....
            except IOError:
                debug('could not put task on queue')

PicklingError: Can’t pickle <type ‘instancemethod’>: attribute lookup __builtin__.instancemethod failed 这个报错给出的信息很明显… 不知道这算是multiprocessing的坑,还是pickle的坑. 看了些stackoverflow回复,貌似在python3.4解决了这类问题. 

我们首先要明白,这问题是怎么引起的? 


python的multiprocessing pool进程池隐形的加入了一个任务队列,在你apply_async的时候,他会使用pickle序列化对象,但是python 2.x的pickle应该是不支持这种模式的序列化. 

解决的方法有这么几种: >

第一种:

    把执行的函数放在外面,这样就避免了把类的实例序列化.   如果不能直接放在外面的化,可以再用一个的函数来包装下. 

第二种:

    不使用进程池,而使用Process函数解决fork进程,这样也避免了pickle序列化对象.

#blog: xiaorui.cc

import multiprocessing
p = multiprocessing.Process(target=Go().run, args=(1, ))
p.start()
p.join()

第三种:

使用copy_reg将MethodType注册为可序列化的方法

#blog: xiaorui.cc

import copy_reg
import types

def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)

copy_reg.pickle(types.MethodType, _pickle_method)

第四种,加入一个中间函数,使用getattr自省黑魔法调用真正的函数.  

#blog: xiaorui.cc

def proxy(instance, *args):
    getattr(instance(),'run')('xiaorui.cc')

END… 


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注