gevent通过socket补丁实现redis.py非阻塞

如题,gevent是使用socket补丁把redis.py改成异步非阻塞化. 异步非阻塞是个有点虚的话题,在python下使用gevent确实解决了io阻塞的问题。

业务场景需要python gevent redis.py的组合,因redis.py涉及到网络io的交互,果断的在开源社区找找gevent有关redis的模块。  


该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.

http://xiaorui.cc/?p=3391

结果有些意思,在github中有关gevent redis的模块一共就3 4个,光看文档就知道不怎么好用。 我用gevent也有5年的年头了,还真没有特意用非阻塞的redis模块。理由是redis本身够快,以前的需求都是那种hash、zset、list的操作。但这次需求不同,大量的使用堵塞性质的命令,比如redis的brpop. 


记得gevent1.0之后就有socket补丁支持,我在想能否替换redis.py ‘ socket模块的方法来实现redis非阻塞。 最后一段折腾,先是翻看redis.py源码,找到创建连接并与连接做send、recv交互的代码,然后逐步测试,果然成功了…  只需要用gevent.socket替换redis.connection.socket就可以了。 这个替换方式也是gevent官方提倡的猴子补丁思想。

下面是gevent redis.py结合方法,  步骤简单…

#xiaorui.cc
from gevent import socket as gsocket
import gevent
import redis
import redis.connection
redis.connection.socket = gsocket

r = redis.StrictRedis()
p = r.connection_pool

下面是gevent socket代码实现

file: gevent.socket.py

有这么几个点

if PY3:
    from gevent import _socket3 as _source #
else:
    from gevent import _socket2 as _source

_source.create_connection = create_connection

大多数人还是python 2.7的.   我们看看gevent._socket2.py做了什么 .

#xiaorui.cc

class socket(object):
    def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None):
        if _sock is None:
            self._sock = _realsocket(family, type, proto)
            self.timeout = _socket.getdefaulttimeout()
        else:
            ...
        self._sock.setblocking(0)  #非阻塞
        fileno = self._sock.fileno()  #拿到文件描述符
        self.hub = get_hub()          #创建hub调度器
        io = self.hub.loop.io
        self._read_event = io(fileno, 1)   #读事件
        self._write_event = io(fileno, 2)  #写事件


    def _wait(self, watcher, timeout_exc=timeout('timed out')):
        if watcher.callback is not None:
            raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
        if self.timeout is not None:
            timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
        else:
            timeout = None
        try:
            self.hub.wait(watcher)
        finally:
            if timeout is not None:
                timeout.cancel()


    def recv(self, *args):
        sock = self._sock
        while True:
            try:
                return sock.recv(*args)
            except error as ex:
                if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:    #如果不属于数据没就绪的异常 或者  属于超时,就raise异常.
                    raise
                sys.exc_clear()
            self._wait(self._read_event)          #循环读,如果有数据立马return返回,没有数据就通过_wait来等待事件

    def send(self, data, flags=0, timeout=timeout_default):
        sock = self._sock
        if timeout is timeout_default:
            timeout = self.timeout
        try:
            return sock.send(data, flags)
        except error as ex:
            if ex.args[0] != EWOULDBLOCK or timeout == 0.0:    #EWOULDBLOCK 这个代表的是数据还没有就绪,然后异常不等于EWOULDBLOCK or EAGAIN,那么他就是连接异常了.
                raise
            sys.exc_clear()
            self._wait(self._write_event)     #同recv,通过_wait等待写事件的触发.
            try:
                return sock.send(data, flags)
            except error as ex2:
                if ex2.args[0] == EWOULDBLOCK:   #说明写缓冲区满了了. 
                    return 0
                raise

   def close(self, _closedsocket=_closedsocket, cancel_wait_ex=cancel_wait_ex):
       self.hub.cancel_wait(self._read_event, cancel_wait_ex)    #关闭读事件
       self.hub.cancel_wait(self._write_event, cancel_wait_ex)   #关闭写事件
       s = self._sock
       self._sock = _closedsocket()
       ...

这次不聊gevent switch hub的调度层实现源码 ,整备下次出几篇连载的文章专门死磕gevent的调度器实现。 


话说最近再写一个python的协程io调度器,越发觉得有点吃力了。 单单给自己用,那么实现起来简单。 但是封装起来就不同了,想让执行调度代码看起来美观清爽更加难上加难,各种的python黑魔法来支撑框架的友好度… 中间遇到各种的memory leak…. 


END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

电子邮件地址不会被公开。 必填项已用*标注