如题,gevent是使用socket补丁把redis.py改成异步非阻塞化. 异步非阻塞是个有点虚的话题,在python下使用gevent确实解决了io阻塞的问题。
业务场景需要python gevent redis.py的组合,因redis.py涉及到网络io的交互,果断的在开源社区找找gevent有关redis的模块。
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.
结果有些意思,在github中有关gevent redis的模块一共就3 4个,光看文档就知道不怎么好用。 我用gevent也有5年的年头了,还真没有特意用非阻塞的redis模块。理由是redis本身够快,以前的需求都是那种hash、zset、list的操作。但这次需求不同,大量的使用堵塞性质的命令,比如redis的brpop.
记得gevent1.0之后就有socket补丁支持,我在想能否替换redis.py ‘ socket模块的方法来实现redis非阻塞。 最后一段折腾,先是翻看redis.py源码,找到创建连接并与连接做send、recv交互的代码,然后逐步测试,果然成功了… 只需要用gevent.socket替换redis.connection.socket就可以了。 这个替换方式也是gevent官方提倡的猴子补丁思想。
下面是gevent redis.py结合方法, 步骤简单…
#xiaorui.cc from gevent import socket as gsocket import gevent import redis import redis.connection redis.connection.socket = gsocket r = redis.StrictRedis() p = r.connection_pool
下面是gevent socket代码实现
file: gevent.socket.py
有这么几个点 if PY3: from gevent import _socket3 as _source # else: from gevent import _socket2 as _source _source.create_connection = create_connection
大多数人还是python 2.7的. 我们看看gevent._socket2.py做了什么 .
#xiaorui.cc class socket(object): def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None): if _sock is None: self._sock = _realsocket(family, type, proto) self.timeout = _socket.getdefaulttimeout() else: ... self._sock.setblocking(0) #非阻塞 fileno = self._sock.fileno() #拿到文件描述符 self.hub = get_hub() #创建hub调度器 io = self.hub.loop.io self._read_event = io(fileno, 1) #读事件 self._write_event = io(fileno, 2) #写事件 def _wait(self, watcher, timeout_exc=timeout('timed out')): if watcher.callback is not None: raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, )) if self.timeout is not None: timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False) else: timeout = None try: self.hub.wait(watcher) finally: if timeout is not None: timeout.cancel() def recv(self, *args): sock = self._sock while True: try: return sock.recv(*args) except error as ex: if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0: #如果不属于数据没就绪的异常 或者 属于超时,就raise异常. raise sys.exc_clear() self._wait(self._read_event) #循环读,如果有数据立马return返回,没有数据就通过_wait来等待事件 def send(self, data, flags=0, timeout=timeout_default): sock = self._sock if timeout is timeout_default: timeout = self.timeout try: return sock.send(data, flags) except error as ex: if ex.args[0] != EWOULDBLOCK or timeout == 0.0: #EWOULDBLOCK 这个代表的是数据还没有就绪,然后异常不等于EWOULDBLOCK or EAGAIN,那么他就是连接异常了. raise sys.exc_clear() self._wait(self._write_event) #同recv,通过_wait等待写事件的触发. try: return sock.send(data, flags) except error as ex2: if ex2.args[0] == EWOULDBLOCK: #说明写缓冲区满了了. return 0 raise def close(self, _closedsocket=_closedsocket, cancel_wait_ex=cancel_wait_ex): self.hub.cancel_wait(self._read_event, cancel_wait_ex) #关闭读事件 self.hub.cancel_wait(self._write_event, cancel_wait_ex) #关闭写事件 s = self._sock self._sock = _closedsocket() ...
这次不聊gevent switch hub的调度层实现源码 ,整备下次出几篇连载的文章专门死磕gevent的调度器实现。
话说最近再写一个python的协程io调度器,越发觉得有点吃力了。 单单给自己用,那么实现起来简单。 但是封装起来就不同了,想让执行调度代码看起来美观清爽更加难上加难,各种的python黑魔法来支撑框架的友好度… 中间遇到各种的memory leak….
END.