用multiprocessing.reduction解决多进程传递文件描述符

使用multiprocessing.reduction解决多进程传递文件描述符的问题

最近还真的是跟multiprocessing干上了,原本习惯使用os.fork的我,现在也开始被迫还是研究多进程multiprocessing带来的问题. 对于复杂的进程调度,我不太喜欢用multiprocessing,因为他封装过度,导致遇到问题就傻逼的看multiprocessing的源码. 


文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。

http://xiaorui.cc/2016/01/19/%E7%94%A8multiprocessing-reduction%E8%A7%A3%E5%86%B3%E5%A4%9A%E8%BF%9B%E7%A8%8B%E4%BC%A0%E9%80%92%E6%96%87%E4%BB%B6%E6%8F%8F%E8%BF%B0%E7%AC%A6/

虽然到现在为止还是略懂python multiprocessing的实现,但在使用过程中还是会遇到些各种问题.  说句闲话,强烈推荐大家checkout gunicorn的代码, gunicorn对于多进程的管理很有一套. 

说正题,同事在写一个回调计算服务,因为涉及到大量的相似度和正反面计算,所以考虑使用多进程做TCP服务。  代码的逻辑很简单,主进程去监听端口,当有请求来的时候,会把该请求扔给子进程去处理。

我再换个方式描述下场景,你有多个Python解释器进程在同时运行,你想将某个打开的文件描述符从一个解释器传递给另外一个。 比如,假设有个服务器进程相应连接请求,但是实际的相应逻辑是在另一个解释器中执行的。

搜了相关的文档,找到了解决的方法.

通常使用 multiprocessing 模块来创建这样的连接会更容易一些。一旦一个连接被创建,你可以使用 multiprocessing.reduction 中的 send_handle() 和 recv_handle() 函数在不同的处理器直接传递文件描述符。 

通过ipython的help查看multiprocessing.reduction的文档说明,他的描述显得很直白,就是让多进程之间可以更好的传递socket对象信息,也就是文件描述符FD信息。

$ ipython

Type “help”, “copyright”, “credits” or “license” for more information.
>>> import multiprocessing.reduction
>>> help(multiprocessing.reduction)

Help on module multiprocessing.reduction in multiprocessing:

NAME
    multiprocessing.reduction

FILE
    /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/reduction.py

MODULE DOCS
    http://docs.python.org/library/multiprocessing.reduction

DESCRIPTION
    # Module to allow connection and socket objects to be transferred
    # between processes
    #
    # multiprocessing/reduction.py
    #
    # Copyright (c) 2006-2008, R Oudkerk
    # All rights reserved.
    #
    # Redistribution and use in source and binary forms, with or without
    # modification, are permitted provided that the following conditions
    # are met:
    #

Not much information, but at least we know where to look: multiprocessing/reduction.py

老路子,我们在看下multiprocessing.reduction的源码. 通过一些代码和官方的测试实例,reduction.py主要是把socket Fd给序列化,反序列化的过程。

对于我们用户来说,会怎么使用multiprocessing.reduction.  当父进程收到用户的请求时,他会在网络层把socket fd用reduce_handle函数处理掉, 然后塞入到队列中.
一堆的子进程通过队列取出一个对象,但这个对象不能直接当做socket使用,需要在使用rebuild_handle处理下. 最后在调用fromfd连接该socket.

对于multiprocessing来说,他其实会开启一个socket服务,在我们调用reduce_handle的时候,他会把socket fd信息复制到_cache里。当我们调用rebuild_handle反序列化对象的时候,他其实会连接我们先前的那个socket服务拉取他所相关联的信息。

原文首页位置,http://xiaorui.cc

#blog: xiaorui.cc

def fromfd(fd, family, type_, proto=0):
    s = socket.fromfd(fd, family, type_, proto)
    if s.__class__ is not socket.socket:
        s = socket.socket(_sock=s)
    return s

#
# Functions to be used for pickling/unpickling objects with handles
#

def reduce_handle(handle):
    if Popen.thread_is_spawning():
        return (None, Popen.duplicate_for_child(handle), True)
    dup_handle = duplicate(handle)
    _cache.add(dup_handle)
    sub_debug('reducing handle %d', handle)
    return (_get_listener().address, dup_handle, False)

def rebuild_handle(pickled_data):
    address, handle, inherited = pickled_data
    if inherited:
        return handle
    sub_debug('rebuilding handle %d', handle)
    conn = Client(address, authkey=current_process().authkey)
    conn.send((handle, os.getpid()))
    new_handle = recv_handle(conn)
    conn.close()
    return new_handle

下面这是个完整的例子,详细的描述了 reduce_handle, rebuild_handle的用法. 

#blog: xiaorui.cc

import sys
import SocketServer
import Queue
import time
import socket
import multiprocessing
from multiprocessing.reduction import reduce_handle
from multiprocessing.reduction import rebuild_handle

class MultiprocessWorker(multiprocessing.Process):

    def __init__(self, sq):
        self.SLEEP_INTERVAL = 1
        multiprocessing.Process.__init__(self)
        self.socket_queue = sq
        self.kill_received = False

    def run(self):
        while not self.kill_received:
            try:
                h = self.socket_queue.get_nowait()
                fd=rebuild_handle(h)
                client_socket=socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM)
                received = client_socket.recv(1024)
                print "Recieved on client: ",received
                client_socket.close()

            except Queue.Empty:
                pass

            time.sleep(self.SLEEP_INTERVAL)

class TCPHandler(SocketServer.BaseRequestHandler):

    def handle(self):

        h = reduce_handle(self.request.fileno())
        socket_queue.put(h)


if __name__ == "__main__":

    address =  ('localhost', 8080)
    server = SocketServer.TCPServer(address, TCPHandler)
    socket_queue = multiprocessing.Queue()

    for i in range(5):
        worker = MultiprocessWorker(socket_queue)
        worker.start()

multiprocessing reduction就这么一回事,当你有多进程之间互相传递socket fd的时候,记得考虑使用multiprocessing.reduction就可以了. 


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注