今天的话题是,源码分析python apscheduler计划任务的实现. 不知道为什么总是跟计划任务较真,自己一些实现了一个分布式的定时任务系统,也在博客中做过分享,但还是很喜欢看看别人是怎么写的,有什么可以借鉴的。
新版的apscheduler 3.1.x有些太杂乱了,在3.x版本里面加了各种的运行模式,比如gevent,多进程啥的. 如果只是想看他的实现原理那我们就直接看他的经典版本2.1版本。
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新. http://xiaorui.cc/2016/01/10/%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90python-apscheduler%E7%9A%84%E5%AE%9E%E7%8E%B0/
那可能有人问了,你丫怎么知道apscheduler的经典版是2.x? 我用apscheduler真的是很久以前了. 曾经用apschedule加tornado开发对内的计划任务系统。
我们首先把apscheduler的代码切到2.1的commit.
git clone https://bitbucket.org/agronholm/apscheduler.git cd apscheduler git checkout 3a4a0a4
下面是apscheduler的2.1版本的测试代码,看起来很是简单。他的执行效果也会是字面理解的那种,每隔一段时间去执行tick和h5函数,不会出现堵塞情况.
#blog: xiaorui.cc import sys import time from datetime import datetime from apscheduler.scheduler import Scheduler def tick(): sys.stdout.write('Tick! The time is: %s\n' % datetime.now()) def h5(): time.sleep(5) sys.stdout.write("this is h5 func") if __name__ == '__main__': scheduler = Scheduler() scheduler.add_interval_job(tick, seconds=2) scheduler.add_interval_job(h5, seconds=1) sys.stdout.write('Press Ctrl+C to exit\n') scheduler.start()
废话少说,现在开始按调度步骤来分析下apscheduler的源代码. apscheduler的代码没有用到python中的黑魔法,实现的方案也比较的中规中矩。
主要就利用了python threading Event 和 Lock锁。
scheduler = Scheduler()
首先我们需要实例化Scheduler类,这个类初始化了必要的一些结构.
scheduler.add_interval_job(tick, seconds=2)
添加一条任务,apscheduler首先会把这条任务用Job来包装下。
job = Job(trigger, func, args or [], kwargs or {}, options.pop('misfire_grace_time', self.misfire_grace_time), options.pop('coalesce', self.coalesce), **options) if not self.running: self._pending_jobs.append((job, jobstore)) else: self._real_add_job(job, jobstore, True) return job
Apscheduler定时任务有三种,一种是add_date_job,传递是datetime类型的时间对象。 add_interval_job是循环执行, 可以秒,分,时,天,星期. add_cron_job跟前面的add_interval_job有些像,但他的场景是单次.
总的来说把一堆的任务加入到 self._pending_jobs, self._jobstores, self._listeners之后,开始正经干活.
def start(self): for job, jobstore in self._pending_jobs: self._real_add_job(job, jobstore, False) del self._pending_jobs[:] self._stopped = False self._thread = Thread(target=self._main_loop, name='APScheduler') self._thread.setDaemon(self.daemonic) self._thread.start() def _main_loop(self): self._wakeup.clear() while not self._stopped: logger.debug('Looking for jobs to run') now = datetime.now() next_wakeup_time = self._process_jobs(now) if next_wakeup_time is not None: wait_seconds = time_difference(next_wakeup_time, now) logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) #会一直wait等待. self._wakeup.wait(wait_seconds) else: logger.debug('No jobs; waiting until a job is added') self._wakeup.wait() self._wakeup.clear() logger.info('Scheduler has been shut down')
这个函数主要是用来判断任务是否到了执行的时间. 需要主要的是,在实例化Scheduler类的时候会创建一组默认线程数为20的线程池。这个线程池用来执行真正的任务。
如下面的意思是,时间一到就立马提交给submit去调度任务.
#blog: xiaorui.cc if run_times: self._threadpool.submit(self._run_job, job, run_times) def _process_jobs(self, now): next_wakeup_time = None self._jobstores_lock.acquire() try: for alias, jobstore in iteritems(self._jobstores): for job in tuple(jobstore.jobs): run_times = job.get_run_times(now) if run_times: self._threadpool.submit(self._run_job, job, run_times)
Apscheduler的代码解析就说到这边了,其实代码本身很是简练,值得大家好好琢磨下.
…