如题,gevent是使用socket补丁把redis.py改成异步非阻塞化. 异步非阻塞是个有点虚的话题,在python下使用gevent确实解决了io阻塞的问题。
业务场景需要python gevent redis.py的组合,因redis.py涉及到网络io的交互,果断的在开源社区找找gevent有关redis的模块。
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.
结果有些意思,在github中有关gevent redis的模块一共就3 4个,光看文档就知道不怎么好用。 我用gevent也有5年的年头了,还真没有特意用非阻塞的redis模块。理由是redis本身够快,以前的需求都是那种hash、zset、list的操作。但这次需求不同,大量的使用堵塞性质的命令,比如redis的brpop.
记得gevent1.0之后就有socket补丁支持,我在想能否替换redis.py ‘ socket模块的方法来实现redis非阻塞。 最后一段折腾,先是翻看redis.py源码,找到创建连接并与连接做send、recv交互的代码,然后逐步测试,果然成功了… 只需要用gevent.socket替换redis.connection.socket就可以了。 这个替换方式也是gevent官方提倡的猴子补丁思想。
下面是gevent redis.py结合方法, 步骤简单…
#xiaorui.cc from gevent import socket as gsocket import gevent import redis import redis.connection redis.connection.socket = gsocket r = redis.StrictRedis() p = r.connection_pool
下面是gevent socket代码实现
file: gevent.socket.py
有这么几个点
if PY3:
from gevent import _socket3 as _source #
else:
from gevent import _socket2 as _source
_source.create_connection = create_connection
大多数人还是python 2.7的. 我们看看gevent._socket2.py做了什么 .
#xiaorui.cc
class socket(object):
def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None):
if _sock is None:
self._sock = _realsocket(family, type, proto)
self.timeout = _socket.getdefaulttimeout()
else:
...
self._sock.setblocking(0) #非阻塞
fileno = self._sock.fileno() #拿到文件描述符
self.hub = get_hub() #创建hub调度器
io = self.hub.loop.io
self._read_event = io(fileno, 1) #读事件
self._write_event = io(fileno, 2) #写事件
def _wait(self, watcher, timeout_exc=timeout('timed out')):
if watcher.callback is not None:
raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
if self.timeout is not None:
timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
else:
timeout = None
try:
self.hub.wait(watcher)
finally:
if timeout is not None:
timeout.cancel()
def recv(self, *args):
sock = self._sock
while True:
try:
return sock.recv(*args)
except error as ex:
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0: #如果不属于数据没就绪的异常 或者 属于超时,就raise异常.
raise
sys.exc_clear()
self._wait(self._read_event) #循环读,如果有数据立马return返回,没有数据就通过_wait来等待事件
def send(self, data, flags=0, timeout=timeout_default):
sock = self._sock
if timeout is timeout_default:
timeout = self.timeout
try:
return sock.send(data, flags)
except error as ex:
if ex.args[0] != EWOULDBLOCK or timeout == 0.0: #EWOULDBLOCK 这个代表的是数据还没有就绪,然后异常不等于EWOULDBLOCK or EAGAIN,那么他就是连接异常了.
raise
sys.exc_clear()
self._wait(self._write_event) #同recv,通过_wait等待写事件的触发.
try:
return sock.send(data, flags)
except error as ex2:
if ex2.args[0] == EWOULDBLOCK: #说明写缓冲区满了了.
return 0
raise
def close(self, _closedsocket=_closedsocket, cancel_wait_ex=cancel_wait_ex):
self.hub.cancel_wait(self._read_event, cancel_wait_ex) #关闭读事件
self.hub.cancel_wait(self._write_event, cancel_wait_ex) #关闭写事件
s = self._sock
self._sock = _closedsocket()
...
这次不聊gevent switch hub的调度层实现源码 ,整备下次出几篇连载的文章专门死磕gevent的调度器实现。
话说最近再写一个python的协程io调度器,越发觉得有点吃力了。 单单给自己用,那么实现起来简单。 但是封装起来就不同了,想让执行调度代码看起来美观清爽更加难上加难,各种的python黑魔法来支撑框架的友好度… 中间遇到各种的memory leak….
END.
