python rq任务队列是如何实现优先级队列 [上]

这段时间跟同事聊了些任务优先级队列的话题,正好我们这把大量的celery抛弃,开始用rq来做任务队列,因为celery太大,所以改造起来甚是恶心到爆,还不如选择一个简单干练的python rq。

xiaorui.cc

xiaorui.cc

xiaorui.cc

xiaorui.cc


微信的抓取api,我是用django写的,任务队列也是用django-rq,定时任务也是用的rq…. 

看了一些rq的源码实现,看得过程正好和同事谈论到任务的优先级队列,基于redis的实现。  rq的名字由来就是 redis queue….

high = Queue(‘high’, default_timeout=8)  # 8 secs
low = Queue(‘low’, default_timeout=600)  # 10 mins
low.enqueue_call(really_really_slow, timeout=3600)  #

他的实现方式比较的简单,你可以配置n个队列,根据你的业务逻辑产生n个评判的等级,然后根据你所得到的等级扔到不同的队列里面。 


high    :    <=== [][][][][][][]  <===!
middle  :                                      <=== [][][][][][][]  <===!
low     :                                                                <======== [][][][][][][]  <=======!


    @classmethod
    def dequeue_any(cls, queues, timeout, connection=None):
        """Class method returning the job_class instance at the front of the given
        set of Queues, where the order of the queues is important.

        When all of the Queues are empty, depending on the `timeout` argument,
        either blocks execution of this function for the duration of the
        timeout or until new messages arrive on any of the queues, or returns
        None.

        See the documentation of cls.lpop for the interpretation of timeout.
        """
        queue_keys = [q.key for q in queues]
        result = cls.lpop(queue_keys, timeout, connection=connection)
        if result is None:
            return None
        queue_key, job_id = map(as_text, result)
        queue = cls.from_queue_key(queue_key, connection=connection)
        try:
            job = cls.job_class.fetch(job_id, connection=connection)
        except NoSuchJobError:
            # Silently pass on jobs that don't exist (anymore),
            # and continue by reinvoking the same function recursively
            return cls.dequeue_any(queues, timeout, connection=connection)
        except UnpickleError as e:
            # Attach queue information on the exception for improved error
            # reporting
            e.job_id = job_id
            e.queue = queue
            raise e
        return job, queue

python rq把redis的一些标准语法也改掉了,比如lpop,他就是进行迭代的取出你的优先级队列。


  @classmethod
    def lpop(cls, queue_keys, timeout, connection=None):
        connection = resolve_connection(connection)
        if timeout is not None:  # blocking variant
            if timeout == 0:
                raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0.')
            result = connection.blpop(queue_keys, timeout)
            if result is None:
                raise DequeueTimeout(timeout, queue_keys)
            queue_key, job_id = result
            return queue_key, job_id
        else:  # non-blocking variant
            for queue_key in queue_keys:
                blob = connection.lpop(queue_key)
                if blob is not None:
                    return queue_key, blob
            return None

其实除了用多个list结构来实现多层的任务队列,我们还是可以用 sorted set来实现的…. 

他相比于用list做任务队列的好处在于,他含有score分值的概念,这个分值也可以是时间戳,那么咱们用redis的zrange或者ZRANGEBYSCORE过滤出一定时间范围内的数据。

#0就算开头了,-1 肯定是结尾了
redis 127.0.0.1:6379> zrange myzset 0 -1 withscores

1) “one”

2) “1”

3) “two”

4) “3”

返回集合中score在给定区间的元素
#这里是返回2和3区间的数据
redis 127.0.0.1:6379> zrangebyscore myzset3 2 3 withscores

1) “two”

2) “2”

3) “three”

4) “3”

而外在提下,sorted是可以出现同一个score的,但是value一定不能一样…. 

相同的score,这情况还是会出现的,比如当score是时间戳的时候…. 

redis 127.0.0.1:6379> zadd xiaorui.cc 1 1
(integer) 1
redis 127.0.0.1:6379> zadd xiaorui.cc 1 ‘one’
(integer) 1
redis 127.0.0.1:6379> zadd xiaorui.cc 2 ‘2’
(integer) 1
redis 127.0.0.1:6379> zadd xiaorui.cc 2 ‘two’
(integer) 1

redis 127.0.0.1:6379> ZRANGE xiaorui.cc 0 -1 withscores
1) “1”
2) “1”
3) “one”
4) “1”
5) “2”
6) “2”
7) “two”
8) “2”

咱们在看看rq的worker是如何运行的…

       try:
           while True:
               try:
                   self.check_for_suspension(burst)

                   if self.stopped:
                       self.log.info('Stopping on request.')
                       break

                   timeout = None if burst else max(1, self.default_worker_ttl - 60)

                   result = self.dequeue_job_and_maintain_ttl(timeout)
                   if result is None:
                       break
               except StopRequested:
                   break

                job, queue = result
                self.execute_job(job)
                self.heartbeat()

                if job.get_status() == JobStatus.FINISHED:
                    queue.enqueue_dependents(job)

最主要的思想就是 他会不停的取出任务,调用execute_job(job),  函数fork一个子进程,这也是咱们每次修改完程序,不用再重启python rq的原因。 

我这里简单说下,我的同事朱伟读了我这篇的文章,指出了我的一个观点,这个观点就是在一个主进程里面,你不管怎么fork,他的子进程的环境是集成于主环境的。 所以,他的自动重载估计是做了reload得机制…

    def execute_job(self, job):
        child_pid = os.fork()
        if child_pid == 0:
            self.main_work_horse(job)
        else:
            self._horse_pid = child_pid
            self.procline('Forked %d at %d' % (child_pid, time.time()))
            while True:
                try:
                    self.set_state('busy')
                    os.waitpid(child_pid, 0)
                    self.set_state('idle')
                    break
                except OSError as e:
                    if e.errno != errno.EINTR:
                        raise

python rq的默认执行时间是360秒,上星期跑大量的数据时,发现大量的timeout的error….  原本以为enqueue的时候,不加timeout参数,rq默认应该不会限制时间,结果他是420s-60s…  这个数字在rq的官网居然没找到…   看了下work.py源码,他是这么说的。。。

DEFAULT_WORKER_TTL = 420
DEFAULT_RESULT_TTL = 500

timeout = None if burst else max(1, self.default_worker_ttl - 60)

result = self.dequeue_job_and_maintain_ttl(timeout)

通过rq的job类,设置和查询每个jobid的信息,在每个流程下,rq都会记录下他的过程的。 当我的子进程正在执行任务之前,我会发出一个busy的信息,这样work主调度逻辑会pass,不会每次都fork进程。

def _get_state(self):

    warnings.warn(
        "worker.state is deprecated, use worker.get_state() instead.",
        DeprecationWarning
    )
    return self.get_state()

state = property(_get_state, _set_state)

def set_current_job_id(self, job_id, pipeline=None):
    connection = pipeline if pipeline is not None else self.connection

    if job_id is None:
        connection.hdel(self.key, 'current_job')
    else:
        connection.hset(self.key, 'current_job', job_id)

def get_current_job_id(self, pipeline=None):
    connection = pipeline if pipeline is not None else self.connection
    return as_text(connection.hget(self.key, 'current_job'))

def get_current_job(self):
    """Returns the job id of the currently executing job."""
    job_id = self.get_current_job_id()

    if job_id is None:
        return None

    return self.job_class.fetch(job_id, self.connection)


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

4 Responses

  1. virtue 2015年12月22日 / 下午4:24

    请问发现大量的timeout的error…,这个如何解决,timeout的含义是什么,是等待timeout固定时间,没有得到执行就报错吗

  2. 请叫我胡care 2015年9月23日 / 下午12:48

    我想问一下 那个 self.heartbeat() 是什么用处?我运行的时候 总是有条debug信息Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.

  3. aotian 2015年4月2日 / 下午10:33

    文章写的相当的不错,大赞。

  4. 运维小笨 2015年4月2日 / 上午7:13

    只是fork么?

发表评论

邮箱地址不会被公开。 必填项已用*标注