我的上下文
“ gevent.hub.LoopExit: This operation would block forever” , 使用过gevent queue的朋友估摸会遇到这类问题把? 前两天写个小东西的时候,又再次遇到了block forever 问题..
报错信息
gevent.hub.LoopExit: This operation would block forever
这报错是什么意思? gevent的hub调度器要一直循环堵塞着, 直到任务协程确保退出… 然而,你没有合理的join堵塞协程,导致调度器自个异常退出.
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.
我的业务有些繁杂,不太好作为例子讲述给大家,所以精简了一段gevent queue测试脚本.
#!/usr/bin/env python # -*- coding:utf-8 -*- import time import random import gevent from gevent.queue import * import gevent.monkey gevent.monkey.patch_all(thread=False) q = Queue() workers = [] def do_work(wid, value): gevent.sleep(random.randint(0,2)) print 'Task', value, 'done', wid return def worker(wid): while True: try: item = q.get() if item: do_work(wid, item) except StopIteration: break except: print "end" def producer(): for i in range(4): workers.append(gevent.spawn(worker, random.randint(1, 888888))) for item in range(1, 9): q.put(item) producer() gevent.joinall(workers)
这样执行的代码肯定会报错的…. error信息如下.
#xiaorui.cc Traceback (most recent call last): File "qq.py", line 53, in <module> gevent.joinall(workers) File "/Library/Python/2.7/site-packages/gevent/greenlet.py", line 400, in joinall wait(greenlets, timeout=timeout) File "/Library/Python/2.7/site-packages/gevent/hub.py", line 645, in wait return list(iwait(objects, timeout)) File "/Library/Python/2.7/site-packages/gevent/hub.py", line 598, in iwait item = waiter.get() File "/Library/Python/2.7/site-packages/gevent/hub.py", line 568, in get return self.hub.switch() File "/Library/Python/2.7/site-packages/gevent/hub.py", line 331, in switch return greenlet.switch(self) gevent.hub.LoopExit: This operation would block forever
通过代码调试我们可以确定在运行gevent queue相关操作出现了问题. 我们进一步瞅瞅gevent queue的源码实现.
Queue定义了数据事件队列,getters和putters,他们使用了扩展数据类型的双端队列, 另外在构造函数里构建了hub调度器和锁标志.
get()方法默认是block的,python queue标注库的get方法也是默认堵塞的,也就是说我单个线程一直堵塞到有任务返回. 我们清楚经过gevent所patch库不存在堵塞这么一说,这也是gevent的魅力所在.
官方的github里有讲述虽然gevent queue默认给你个堵塞的参数,但gevent hub调取不会让你某个协程独占线程的cpu执行资源,肯定会把该任务switch切换到hub里。 我们通过_get类型方法调用__get_or_peek方法,如果qsize()为真,那么就返回结果. 你的block参数决定你是否可以立马返回. 你到了waiter = Waiter()这一步,说明队列是空的,然后你的block为True。 这时候get_or_peek给你加几个Waiter等待事件并且扔到self.getters事件池里. 如果你timeout值,他会向hub主循环调度里面注册个超时时间, 回调函数为 Empty = __queue__.Empty .
class Queue(object): def __init__(self, maxsize=None, items=None): ... self.getters = collections.deque() self.putters = collections.deque() self.hub = get_hub() self._event_unlock = None ,,, def __get_or_peek(self, method, block, timeout): if self.hub is getcurrent(): while self.putters: # Note: get() used popleft(), peek used pop(); popleft # is almost certainly correct. self.putters.popleft().put_and_switch() if self.qsize(): return method() raise Empty() if not block: raise Empty() waiter = Waiter() timeout = Timeout._start_new_or_dummy(timeout, Empty) try: self.getters.append(waiter) if self.putters: self._schedule_unlock() result = waiter.get() if result is not waiter: raise InvalidSwitchError('Invalid switch into Queue.get: %r' % (result, )) return method() finally: timeout.cancel() _safe_remove(self.getters, waiter) def get(self, block=True, timeout=None): ... if self.qsize(): if self.putters: self._schedule_unlock() return self._get() return self.__get_or_peek(self._get, block, timeout) def get_nowait(self): ... #非阻塞方法 return self.get(False) ....
这个跟咱们的报错是息息相关的,因为我的任务队列已经被那几个协程给消费完了,那几个协程都在做waiter操作, 这样会造成gevent的join失败, 因为你的任务流已经不存在了,都挂在queue队列的一组waiter列表里面。 你在回头看看那些报错信息,可以品味到waiter惹的祸… 此外你的那几个任务可以使用切换到自身的抽象逻辑,最简单就是加个gevent.sleep()… 这地方有些不好理解,大家可以看下gevent的调度原理.
解决的方法
解决方法有这么几个,不让他使用waiter事件,或者有一个任务一直再跑着.
我直接采用非阻塞的方法来取结果,不管有没有都会立刻给我一个答复. 可以选择性使用gevent.sleep()减少间隔.
#xiaorui.cc q.get(block=False) q.get_nowait()
需要有一个协程任务一直在跑着,这样gevent hub就可以join住整组任务流. 当然我相信你不会选择用这个方法.
def ss(): while 1: gevent.sleep(1) s = gevent.spawn(ss) workers.append(s)
除此之外,gevent为这类生产者消费者的工作类型,扩展了一个可join的队列。
原理也简单,JoinableQueue继承父类Queue,构造一个“未完成”的计数器,每次put任务的时候需要加一个计数, 每次触发task_done的时候才会减一。 当unfinished_tasks 为 0时,重置 self._cond.set() .
当我们执行JoinableQueue join方法时会堵塞到gevent event事件被重置才会返回… 所有,该函数运行的位置还是很重要的.
class JoinableQueue(Queue): def __init__(self, maxsize=None, items=None, unfinished_tasks=None): from gevent.event import Event Queue.__init__(self, maxsize, items) self._cond = Event() self._cond.set() if unfinished_tasks: self.unfinished_tasks = unfinished_tasks elif items: self.unfinished_tasks = len(items) else: self.unfinished_tasks = 0 .... def _put(self, item): Queue._put(self, item) self.unfinished_tasks += 1 self._cond.clear() ... def task_done(self): if self.unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self.unfinished_tasks -= 1 if self.unfinished_tasks == 0: self._cond.set() def join(self, timeout=None): return self._cond.wait(timeout=timeout)
我在stackoverflow看到有些老外对这个”gevent join block forever” 很是反感,有人把这他为gevent的一个调度bug。 其实我也是这么想的。 jamadden给予的答复是,请合理使用queue….
这次又读了gevent hub的主调度代码,有些心得。。。 下次分享出来.
END.
谢谢分享,可搞定了
萨
啥意思?
厉害呀,大神!