使用multiprocessing.reduction解决多进程传递文件描述符的问题
最近还真的是跟multiprocessing干上了,原本习惯使用os.fork的我,现在也开始被迫还是研究多进程multiprocessing带来的问题. 对于复杂的进程调度,我不太喜欢用multiprocessing,因为他封装过度,导致遇到问题就傻逼的看multiprocessing的源码.
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。
虽然到现在为止还是略懂python multiprocessing的实现,但在使用过程中还是会遇到些各种问题. 说句闲话,强烈推荐大家checkout gunicorn的代码, gunicorn对于多进程的管理很有一套.
说正题,同事在写一个回调计算服务,因为涉及到大量的相似度和正反面计算,所以考虑使用多进程做TCP服务。 代码的逻辑很简单,主进程去监听端口,当有请求来的时候,会把该请求扔给子进程去处理。
我再换个方式描述下场景,你有多个Python解释器进程在同时运行,你想将某个打开的文件描述符从一个解释器传递给另外一个。 比如,假设有个服务器进程相应连接请求,但是实际的相应逻辑是在另一个解释器中执行的。
搜了相关的文档,找到了解决的方法.
通常使用 multiprocessing 模块来创建这样的连接会更容易一些。一旦一个连接被创建,你可以使用 multiprocessing.reduction 中的 send_handle() 和 recv_handle() 函数在不同的处理器直接传递文件描述符。
通过ipython的help查看multiprocessing.reduction的文档说明,他的描述显得很直白,就是让多进程之间可以更好的传递socket对象信息,也就是文件描述符FD信息。
$ ipython
Type “help”, “copyright”, “credits” or “license” for more information.
>>> import multiprocessing.reduction
>>> help(multiprocessing.reduction)
Help on module multiprocessing.reduction in multiprocessing:
NAME
multiprocessing.reduction
FILE
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/reduction.py
MODULE DOCS
http://docs.python.org/library/multiprocessing.reduction
DESCRIPTION
# Module to allow connection and socket objects to be transferred
# between processes
#
# multiprocessing/reduction.py
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
Not much information, but at least we know where to look: multiprocessing/reduction.py
老路子,我们在看下multiprocessing.reduction的源码. 通过一些代码和官方的测试实例,reduction.py主要是把socket Fd给序列化,反序列化的过程。
对于我们用户来说,会怎么使用multiprocessing.reduction. 当父进程收到用户的请求时,他会在网络层把socket fd用reduce_handle函数处理掉, 然后塞入到队列中.
一堆的子进程通过队列取出一个对象,但这个对象不能直接当做socket使用,需要在使用rebuild_handle处理下. 最后在调用fromfd连接该socket.
对于multiprocessing来说,他其实会开启一个socket服务,在我们调用reduce_handle的时候,他会把socket fd信息复制到_cache里。当我们调用rebuild_handle反序列化对象的时候,他其实会连接我们先前的那个socket服务拉取他所相关联的信息。
原文首页位置,http://xiaorui.cc
#blog: xiaorui.cc def fromfd(fd, family, type_, proto=0): s = socket.fromfd(fd, family, type_, proto) if s.__class__ is not socket.socket: s = socket.socket(_sock=s) return s # # Functions to be used for pickling/unpickling objects with handles # def reduce_handle(handle): if Popen.thread_is_spawning(): return (None, Popen.duplicate_for_child(handle), True) dup_handle = duplicate(handle) _cache.add(dup_handle) sub_debug('reducing handle %d', handle) return (_get_listener().address, dup_handle, False) def rebuild_handle(pickled_data): address, handle, inherited = pickled_data if inherited: return handle sub_debug('rebuilding handle %d', handle) conn = Client(address, authkey=current_process().authkey) conn.send((handle, os.getpid())) new_handle = recv_handle(conn) conn.close() return new_handle
下面这是个完整的例子,详细的描述了 reduce_handle, rebuild_handle的用法.
#blog: xiaorui.cc import sys import SocketServer import Queue import time import socket import multiprocessing from multiprocessing.reduction import reduce_handle from multiprocessing.reduction import rebuild_handle class MultiprocessWorker(multiprocessing.Process): def __init__(self, sq): self.SLEEP_INTERVAL = 1 multiprocessing.Process.__init__(self) self.socket_queue = sq self.kill_received = False def run(self): while not self.kill_received: try: h = self.socket_queue.get_nowait() fd=rebuild_handle(h) client_socket=socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM) received = client_socket.recv(1024) print "Recieved on client: ",received client_socket.close() except Queue.Empty: pass time.sleep(self.SLEEP_INTERVAL) class TCPHandler(SocketServer.BaseRequestHandler): def handle(self): h = reduce_handle(self.request.fileno()) socket_queue.put(h) if __name__ == "__main__": address = ('localhost', 8080) server = SocketServer.TCPServer(address, TCPHandler) socket_queue = multiprocessing.Queue() for i in range(5): worker = MultiprocessWorker(socket_queue) worker.start()
multiprocessing reduction就这么一回事,当你有多进程之间互相传递socket fd的时候,记得考虑使用multiprocessing.reduction就可以了.