分析apscheduler的定时调度器源码

前言:

跟大家在群里聊了关于apscheduler的一些话题,大家尤其对他的时间管理感兴趣, 趁现在有空,花点时间做个分享。  其实在以前是写过apscheduler的源代码分析文章,写得太泛了,这次主要着重于定时器的相关逻辑。  python下的定时任务框架有那么几个选择,像sched,celery,apscheduler都可以做,相对来说功能和扩展最好的当属apscheduler了。   话说,我曾经还给apscheduler提交过一个crontab 格式的扩展,最后没merge,原因不明….  

该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.   http://xiaorui.cc/?p=4228

首先从一个测试说起,代码如下:

# xiaorui.cc
# coding:utf-8

from datetime import datetime
import time
import os

from apscheduler.schedulers.background import BackgroundScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

def new():
    print('this is xiaorui.cc time %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=11)  # 第一个任务
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.add_job(new, 'interval', seconds=1)   # 第二个任务
        while True:
            time.sleep(3)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

执行的结果确实是我们想看到的,很理想  !  

那么我们应该去思考下,Apscheduler是如何管理这些定时器的 ?  我曾经写过各种各样的定时器, 用redis zset实现定时器,用gevent实现过, 用小堆实现过, 用epoll timerfd也实现过…  有兴趣的朋友可以看看我的github和博文. 

追了一下代码发现 apscheduler 默认是没有采用这些方法, 而使用了一个更加简单的方法. 

# xiaorui.cc

class BackgroundScheduler(BlockingScheduler):
    _thread = None

    def start(self, *args, **kwargs):
        self._event = Event()
        BaseScheduler.start(self, *args, **kwargs)
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        self._thread.daemon = self._daemon
        self._thread.start()

class BlockingScheduler(BaseScheduler):
    _event = None

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            self._event.wait(wait_seconds)  # 等待时间, 这个线程专门用来管理定时器时间的.
            self._event.clear()
            wait_seconds = self._process_jobs()

    def wakeup(self):
        self._event.set()  # 唤醒,避免过长的休眠



def _process_jobs(self):
        if self.state == STATE_PAUSED:
            ...

        now = datetime.now(self.timezone)  # 当前时间,这个很重要 !
        next_wakeup_time = None
        events = []

        with self._jobstores_lock:
            for jobstore_alias, jobstore in six.iteritems(self._jobstores):  # 把当前任务给拿出来.
                try:
                    due_jobs = jobstore.get_due_jobs(now)  #  计算是否到了执行时间
                except Exception as e:
                    # Schedule a wakeup at least in jobstore_retry_interval seconds   # 默认是 小于 10s 的
                    self._logger.warning('Error getting due jobs from job store %r: %s',
                                         jobstore_alias, e)
                    retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)  # self.jobstore_retry_interval = float(config.pop('jobstore_retry_interval', 10))
                    if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                        next_wakeup_time = retry_wakeup_time

                    continue

                for job in due_jobs:
                    ... ...

                    run_times = job._get_run_times(now)
                    run_times = run_times[-1:] if run_times and job.coalesce else run_times
                    if run_times:
                        try:
                            executor.submit_job(job, run_times)  # 提交这个任务
                        ... ...


                jobstore_next_run_time = jobstore.get_next_run_time()  #获取下次执行的时间
                if jobstore_next_run_time and (next_wakeup_time is None or jobstore_next_run_time < next_wakeup_time):
                    next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)  # 判断计算最近的任务时间.

        # Dispatch collected events
        for event in events:
            self._dispatch_event(event)

        if self.state == STATE_PAUSED:  # 如果被暂停了,那么当然就不用wait了
            wait_seconds = None
        elif next_wakeup_time is None:
            wait_seconds = None
        else:
            wait_seconds = max(timedelta_seconds(next_wakeup_time - now), 0)  # 当然不能是0秒 
            self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
                               wait_seconds)

        return wait_seconds

在这里我们知道他会wait这个事件,超时的时间为所有定时器中最小的那个。。。   举个例子现在是1秒, 有三个任务分别是10秒、15秒、20秒后执行? 那么我们就再等待 10 – 1 秒。

那么这里会有一个效率的问题 !  当job列表任务很多的时候,难道每次都O(N)的时间复杂度计算下次event等待时间? 

当然不可能这么粗暴  !     

Apscheduler 使用jobstore为memery模式时候,会构建一个列表来用存放任务,当我们 add_job 添加一个新任务的时候,这个列表按时间排序存放,可以理解为有序列表,查找存放的点当然是二分查找算法了, 他的时间复杂度是 O(logN) .   这样实现的效果跟我前面说的小堆是一样的.   

def add_job(self, job):
    if job.id in self._jobs_index:
        raise ConflictingIdError(job.id)

    timestamp = datetime_to_utc_timestamp(job.next_run_time)
    index = self._get_job_index(timestamp, job.id)
    self._jobs.insert(index, (job, timestamp))
    self._jobs_index[job.id] = (job, timestamp)

def _get_job_index(self, timestamp, job_id):
    lo, hi = 0, len(self._jobs)
    timestamp = float('inf') if timestamp is None else timestamp
    while lo < hi:
        mid = (lo + hi) // 2
        mid_job, mid_timestamp = self._jobs[mid]
        mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp
        if mid_timestamp > timestamp:
            hi = mid
        elif mid_timestamp < timestamp:
            lo = mid + 1
        elif mid_job.id > job_id:
            hi = mid
        elif mid_job.id < job_id:
            lo = mid + 1
        else:
            return mid

    return lo


如果 apscheduler的定时任务已经跑起来了, 这时候你新加了一个定时周期任务?  但是你这边还在wait?  不要担心,当你调用add_job时, _real_add_job会帮你唤醒的.  当被唤醒了后,Apscheduler又继续寻找到期任务及等待时间。 

def _real_add_job(self, job, jobstore_alias, replace_existing):
    if self.state == STATE_RUNNING:
        self.wakeup()

Apscheduler扩展了很多的jobstore存储方案,那么我们挑几个看看他们的实现方法. 

file: jobstores/redis.py

def get_due_jobs(self, now):
    timestamp = datetime_to_utc_timestamp(now)
    job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp)
    if job_ids:
        job_states = self.redis.hmget(self.jobs_key, *job_ids)
        ...

def add_job(self, job):
    if self.redis.hexists(self.jobs_key, job.id):
        raise ConflictingIdError(job.id)

    with self.redis.pipeline() as pipe:
        pipe.multi()
        pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
                                                      self.pickle_protocol))
        if job.next_run_time:
            pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
        pipe.execute()

当然mysql也是依赖时间戳的概念,也是需要有排序的。

def get_due_jobs(self, now):
    timestamp = datetime_to_utc_timestamp(now)
    return self._get_jobs({'next_run_time': {'$lte': timestamp}})

def add_job(self, job):
    try:
        self.collection.insert({
            '_id': job.id,
            'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
            'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
        })
    except DuplicateKeyError:
        raise ConflictingIdError(job.id)

就先这样吧,.  到此为止我们差不多得知apscheduler的scheduler和job的设计,下次有时间再来分析下apscheduler的其他组件源码。

END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注