源码分析python apscheduler的实现

今天的话题是,源码分析python apscheduler计划任务的实现.  不知道为什么总是跟计划任务较真,自己一些实现了一个分布式的定时任务系统,也在博客中做过分享,但还是很喜欢看看别人是怎么写的,有什么可以借鉴的。 

新版的apscheduler 3.1.x有些太杂乱了,在3.x版本里面加了各种的运行模式,比如gevent,多进程啥的. 如果只是想看他的实现原理那我们就直接看他的经典版本2.1版本。


文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新. http://xiaorui.cc/2016/01/10/%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90python-apscheduler%E7%9A%84%E5%AE%9E%E7%8E%B0/

那可能有人问了,你丫怎么知道apscheduler的经典版是2.x? 我用apscheduler真的是很久以前了. 曾经用apschedule加tornado开发对内的计划任务系统。 


我们首先把apscheduler的代码切到2.1的commit.  

git clone https://bitbucket.org/agronholm/apscheduler.git
cd apscheduler
git checkout 3a4a0a4

下面是apscheduler的2.1版本的测试代码,看起来很是简单。他的执行效果也会是字面理解的那种,每隔一段时间去执行tick和h5函数,不会出现堵塞情况.  

#blog: xiaorui.cc

import sys
import time
from datetime import datetime

from apscheduler.scheduler import Scheduler

def tick():
    sys.stdout.write('Tick! The time is: %s\n' % datetime.now())

def h5():
    time.sleep(5)
    sys.stdout.write("this is h5 func")

if __name__ == '__main__':
    scheduler = Scheduler()
    scheduler.add_interval_job(tick, seconds=2)
    scheduler.add_interval_job(h5, seconds=1)
    sys.stdout.write('Press Ctrl+C to exit\n')
    scheduler.start()

废话少说,现在开始按调度步骤来分析下apscheduler的源代码. apscheduler的代码没有用到python中的黑魔法,实现的方案也比较的中规中矩。 
主要就利用了python threading Event 和 Lock锁。

scheduler = Scheduler() 

首先我们需要实例化Scheduler类,这个类初始化了必要的一些结构.

scheduler.add_interval_job(tick, seconds=2) 

添加一条任务,apscheduler首先会把这条任务用Job来包装下。 

job = Job(trigger, func, args or [], kwargs or {},
          options.pop('misfire_grace_time', self.misfire_grace_time),
          options.pop('coalesce', self.coalesce), **options)
if not self.running:
    self._pending_jobs.append((job, jobstore))
else:
    self._real_add_job(job, jobstore, True)
return job

Apscheduler定时任务有三种,一种是add_date_job,传递是datetime类型的时间对象。  add_interval_job是循环执行, 可以秒,分,时,天,星期.  add_cron_job跟前面的add_interval_job有些像,但他的场景是单次.


总的来说把一堆的任务加入到 self._pending_jobs, self._jobstores, self._listeners之后,开始正经干活.

def start(self):
    for job, jobstore in self._pending_jobs:
        self._real_add_job(job, jobstore, False)
    del self._pending_jobs[:]

    self._stopped = False
    self._thread = Thread(target=self._main_loop, name='APScheduler')
    self._thread.setDaemon(self.daemonic)
    self._thread.start()

def _main_loop(self):
    self._wakeup.clear()
    while not self._stopped:
        logger.debug('Looking for jobs to run')
        now = datetime.now()
        next_wakeup_time = self._process_jobs(now)

        if next_wakeup_time is not None:
            wait_seconds = time_difference(next_wakeup_time, now)
            logger.debug('Next wakeup is due at %s (in %f seconds)',
                         next_wakeup_time, wait_seconds)

            #会一直wait等待.
            self._wakeup.wait(wait_seconds)
        else:
            logger.debug('No jobs; waiting until a job is added')
            self._wakeup.wait()
        self._wakeup.clear()

    logger.info('Scheduler has been shut down')

这个函数主要是用来判断任务是否到了执行的时间. 需要主要的是,在实例化Scheduler类的时候会创建一组默认线程数为20的线程池。这个线程池用来执行真正的任务。
如下面的意思是,时间一到就立马提交给submit去调度任务.

#blog: xiaorui.cc

if run_times:
    self._threadpool.submit(self._run_job, job, run_times)


def _process_jobs(self, now):
    next_wakeup_time = None
    self._jobstores_lock.acquire()
    try:
        for alias, jobstore in iteritems(self._jobstores):
            for job in tuple(jobstore.jobs):
                run_times = job.get_run_times(now)
                if run_times:
                    self._threadpool.submit(self._run_job, job, run_times)


Apscheduler的代码解析就说到这边了,其实代码本身很是简练,值得大家好好琢磨下. 


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注