关于gevent queue遭遇hub.LoopExit问题

我的上下文

gevent.hub.LoopExit: This operation would block forever  ,   使用过gevent queue的朋友估摸会遇到这类问题把?   前两天写个小东西的时候,又再次遇到了block forever 问题..    

报错信息

gevent.hub.LoopExit: This operation would block forever

这报错是什么意思?  gevent的hub调度器要一直循环堵塞着, 直到任务协程确保退出…  然而,你没有合理的join堵塞协程,导致调度器自个异常退出. 


该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.

http://xiaorui.cc/?p=3717


我的业务有些繁杂,不太好作为例子讲述给大家,所以精简了一段gevent queue测试脚本.

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
import random

import gevent
from gevent.queue import *

import gevent.monkey
gevent.monkey.patch_all(thread=False)


q = Queue()
workers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid
    return


def worker(wid):
    while True:
        try:
            item = q.get()
            if item:
                do_work(wid, item)
        except StopIteration:
            break
        except:
            print "end"


def producer():
    for i in range(4):
        workers.append(gevent.spawn(worker, random.randint(1, 888888)))

    for item in range(1, 9):
         q.put(item)


producer()
gevent.joinall(workers)


这样执行的代码肯定会报错的….  error信息如下.

#xiaorui.cc

Traceback (most recent call last):
  File "qq.py", line 53, in <module>
    gevent.joinall(workers)
  File "/Library/Python/2.7/site-packages/gevent/greenlet.py", line 400, in joinall
    wait(greenlets, timeout=timeout)
  File "/Library/Python/2.7/site-packages/gevent/hub.py", line 645, in wait
    return list(iwait(objects, timeout))
  File "/Library/Python/2.7/site-packages/gevent/hub.py", line 598, in iwait
    item = waiter.get()
  File "/Library/Python/2.7/site-packages/gevent/hub.py", line 568, in get
    return self.hub.switch()
  File "/Library/Python/2.7/site-packages/gevent/hub.py", line 331, in switch
    return greenlet.switch(self)
gevent.hub.LoopExit: This operation would block forever

通过代码调试我们可以确定在运行gevent queue相关操作出现了问题.    我们进一步瞅瞅gevent queue的源码实现.

Queue定义了数据事件队列,getters和putters,他们使用了扩展数据类型的双端队列,  另外在构造函数里构建了hub调度器和锁标志. 

get()方法默认是block的,python queue标注库的get方法也是默认堵塞的,也就是说我单个线程一直堵塞到有任务返回.   我们清楚经过gevent所patch库不存在堵塞这么一说,这也是gevent的魅力所在.  

官方的github里有讲述虽然gevent queue默认给你个堵塞的参数,但gevent hub调取不会让你某个协程独占线程的cpu执行资源,肯定会把该任务switch切换到hub里。  我们通过_get类型方法调用__get_or_peek方法,如果qsize()为真,那么就返回结果.   你的block参数决定你是否可以立马返回.   你到了waiter =  Waiter()这一步,说明队列是空的,然后你的block为True。 这时候get_or_peek给你加几个Waiter等待事件并且扔到self.getters事件池里.    如果你timeout值,他会向hub主循环调度里面注册个超时时间, 回调函数为   Empty = __queue__.Empty .

class Queue(object):
    def __init__(self, maxsize=None, items=None):
        ...
        self.getters = collections.deque()
        self.putters = collections.deque()
        self.hub = get_hub()
        self._event_unlock = None
        ,,,

    def __get_or_peek(self, method, block, timeout):
        if self.hub is getcurrent():
            while self.putters:
                # Note: get() used popleft(), peek used pop(); popleft
                # is almost certainly correct.
                self.putters.popleft().put_and_switch()
                if self.qsize():
                    return method()
            raise Empty()

        if not block:
            raise Empty()
        waiter = Waiter()
        timeout = Timeout._start_new_or_dummy(timeout, Empty)
        try:
            self.getters.append(waiter)
            if self.putters:
                self._schedule_unlock()
            result = waiter.get()
            if result is not waiter:
                raise InvalidSwitchError('Invalid switch into Queue.get: %r' % (result, ))
            return method()
        finally:
            timeout.cancel()
            _safe_remove(self.getters, waiter)

    def get(self, block=True, timeout=None):
        ...
        if self.qsize():
            if self.putters:
                self._schedule_unlock()
            return self._get()

        return self.__get_or_peek(self._get, block, timeout)

    def get_nowait(self):

        ...  #非阻塞方法
        return self.get(False)

    ....

这个跟咱们的报错是息息相关的,因为我的任务队列已经被那几个协程给消费完了,那几个协程都在做waiter操作, 这样会造成gevent的join失败, 因为你的任务流已经不存在了,都挂在queue队列的一组waiter列表里面。 你在回头看看那些报错信息,可以品味到waiter惹的祸…  此外你的那几个任务可以使用切换到自身的抽象逻辑,最简单就是加个gevent.sleep()…    这地方有些不好理解,大家可以看下gevent的调度原理.

解决的方法

解决方法有这么几个,不让他使用waiter事件,或者有一个任务一直再跑着.  

我直接采用非阻塞的方法来取结果,不管有没有都会立刻给我一个答复.   可以选择性使用gevent.sleep()减少间隔. 

#xiaorui.cc q.get(block=False)

q.get_nowait()

需要有一个协程任务一直在跑着,这样gevent hub就可以join住整组任务流.  当然我相信你不会选择用这个方法. 

def ss():
    while 1:
        gevent.sleep(1)

s = gevent.spawn(ss)
workers.append(s)

除此之外,gevent为这类生产者消费者的工作类型,扩展了一个可join的队列。 

原理也简单,JoinableQueue继承父类Queue,构造一个“未完成”的计数器,每次put任务的时候需要加一个计数, 每次触发task_done的时候才会减一。 当unfinished_tasks 为 0时,重置 self._cond.set() .

当我们执行JoinableQueue join方法时会堵塞到gevent event事件被重置才会返回…   所有,该函数运行的位置还是很重要的. 

class JoinableQueue(Queue):

    def __init__(self, maxsize=None, items=None, unfinished_tasks=None):
        from gevent.event import Event
        Queue.__init__(self, maxsize, items)
        self._cond = Event()
        self._cond.set()

        if unfinished_tasks:
            self.unfinished_tasks = unfinished_tasks
        elif items:
            self.unfinished_tasks = len(items)
        else:
            self.unfinished_tasks = 0
        ....

    def _put(self, item):
        Queue._put(self, item)
        self.unfinished_tasks += 1
        self._cond.clear()
        ...

    def task_done(self):
        if self.unfinished_tasks <= 0:
            raise ValueError('task_done() called too many times')
        self.unfinished_tasks -= 1
        if self.unfinished_tasks == 0:
            self._cond.set()

    def join(self, timeout=None):
        return self._cond.wait(timeout=timeout)

我在stackoverflow看到有些老外对这个”gevent join block forever” 很是反感,有人把这他为gevent的一个调度bug。   其实我也是这么想的。 jamadden给予的答复是,请合理使用queue….    

这次又读了gevent hub的主调度代码,有些心得。。。 下次分享出来.


END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

4 Responses

  1. 2016年11月24日 / 下午6:04

    谢谢分享,可搞定了

  2. 2016年9月6日 / 下午11:22

  3. 大神 2016年8月10日 / 下午7:40

    厉害呀,大神!

发表评论

邮箱地址不会被公开。 必填项已用*标注