前言:
跟大家在群里聊了关于apscheduler的一些话题,大家尤其对他的时间管理感兴趣, 趁现在有空,花点时间做个分享。 其实在以前是写过apscheduler的源代码分析文章,写得太泛了,这次主要着重于定时器的相关逻辑。 python下的定时任务框架有那么几个选择,像sched,celery,apscheduler都可以做,相对来说功能和扩展最好的当属apscheduler了。 话说,我曾经还给apscheduler提交过一个crontab 格式的扩展,最后没merge,原因不明….
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新. http://xiaorui.cc/?p=4228
首先从一个测试说起,代码如下:
# xiaorui.cc # coding:utf-8 from datetime import datetime import time import os from apscheduler.schedulers.background import BackgroundScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) def new(): print('this is xiaorui.cc time %s' % datetime.now()) if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(tick, 'interval', seconds=11) # 第一个任务 scheduler.start() print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.add_job(new, 'interval', seconds=1) # 第二个任务 while True: time.sleep(3) except (KeyboardInterrupt, SystemExit): scheduler.shutdown()
执行的结果确实是我们想看到的,很理想 !
那么我们应该去思考下,Apscheduler是如何管理这些定时器的 ? 我曾经写过各种各样的定时器, 用redis zset实现定时器,用gevent实现过, 用小堆实现过, 用epoll timerfd也实现过… 有兴趣的朋友可以看看我的github和博文.
追了一下代码发现 apscheduler 默认是没有采用这些方法, 而使用了一个更加简单的方法.
# xiaorui.cc class BackgroundScheduler(BlockingScheduler): _thread = None def start(self, *args, **kwargs): self._event = Event() BaseScheduler.start(self, *args, **kwargs) self._thread = Thread(target=self._main_loop, name='APScheduler') self._thread.daemon = self._daemon self._thread.start() class BlockingScheduler(BaseScheduler): _event = None def _main_loop(self): wait_seconds = TIMEOUT_MAX while self.state != STATE_STOPPED: self._event.wait(wait_seconds) # 等待时间, 这个线程专门用来管理定时器时间的. self._event.clear() wait_seconds = self._process_jobs() def wakeup(self): self._event.set() # 唤醒,避免过长的休眠 def _process_jobs(self): if self.state == STATE_PAUSED: ... now = datetime.now(self.timezone) # 当前时间,这个很重要 ! next_wakeup_time = None events = [] with self._jobstores_lock: for jobstore_alias, jobstore in six.iteritems(self._jobstores): # 把当前任务给拿出来. try: due_jobs = jobstore.get_due_jobs(now) # 计算是否到了执行时间 except Exception as e: # Schedule a wakeup at least in jobstore_retry_interval seconds # 默认是 小于 10s 的 self._logger.warning('Error getting due jobs from job store %r: %s', jobstore_alias, e) retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval) # self.jobstore_retry_interval = float(config.pop('jobstore_retry_interval', 10)) if not next_wakeup_time or next_wakeup_time > retry_wakeup_time: next_wakeup_time = retry_wakeup_time continue for job in due_jobs: ... ... run_times = job._get_run_times(now) run_times = run_times[-1:] if run_times and job.coalesce else run_times if run_times: try: executor.submit_job(job, run_times) # 提交这个任务 ... ... jobstore_next_run_time = jobstore.get_next_run_time() #获取下次执行的时间 if jobstore_next_run_time and (next_wakeup_time is None or jobstore_next_run_time < next_wakeup_time): next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone) # 判断计算最近的任务时间. # Dispatch collected events for event in events: self._dispatch_event(event) if self.state == STATE_PAUSED: # 如果被暂停了,那么当然就不用wait了 wait_seconds = None elif next_wakeup_time is None: wait_seconds = None else: wait_seconds = max(timedelta_seconds(next_wakeup_time - now), 0) # 当然不能是0秒 self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) return wait_seconds
在这里我们知道他会wait这个事件,超时的时间为所有定时器中最小的那个。。。 举个例子现在是1秒, 有三个任务分别是10秒、15秒、20秒后执行? 那么我们就再等待 10 – 1 秒。
那么这里会有一个效率的问题 ! 当job列表任务很多的时候,难道每次都O(N)的时间复杂度计算下次event等待时间?
当然不可能这么粗暴 !
Apscheduler 使用jobstore为memery模式时候,会构建一个列表来用存放任务,当我们 add_job 添加一个新任务的时候,这个列表按时间排序存放,可以理解为有序列表,查找存放的点当然是二分查找算法了, 他的时间复杂度是 O(logN) . 这样实现的效果跟我前面说的小堆是一样的.
def add_job(self, job): if job.id in self._jobs_index: raise ConflictingIdError(job.id) timestamp = datetime_to_utc_timestamp(job.next_run_time) index = self._get_job_index(timestamp, job.id) self._jobs.insert(index, (job, timestamp)) self._jobs_index[job.id] = (job, timestamp) def _get_job_index(self, timestamp, job_id): lo, hi = 0, len(self._jobs) timestamp = float('inf') if timestamp is None else timestamp while lo < hi: mid = (lo + hi) // 2 mid_job, mid_timestamp = self._jobs[mid] mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp if mid_timestamp > timestamp: hi = mid elif mid_timestamp < timestamp: lo = mid + 1 elif mid_job.id > job_id: hi = mid elif mid_job.id < job_id: lo = mid + 1 else: return mid return lo
如果 apscheduler的定时任务已经跑起来了, 这时候你新加了一个定时周期任务? 但是你这边还在wait? 不要担心,当你调用add_job时, _real_add_job会帮你唤醒的. 当被唤醒了后,Apscheduler又继续寻找到期任务及等待时间。
def _real_add_job(self, job, jobstore_alias, replace_existing): if self.state == STATE_RUNNING: self.wakeup()
Apscheduler扩展了很多的jobstore存储方案,那么我们挑几个看看他们的实现方法.
file: jobstores/redis.py
def get_due_jobs(self, now): timestamp = datetime_to_utc_timestamp(now) job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp) if job_ids: job_states = self.redis.hmget(self.jobs_key, *job_ids) ... def add_job(self, job): if self.redis.hexists(self.jobs_key, job.id): raise ConflictingIdError(job.id) with self.redis.pipeline() as pipe: pipe.multi() pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), self.pickle_protocol)) if job.next_run_time: pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id) pipe.execute()
当然mysql也是依赖时间戳的概念,也是需要有排序的。
def get_due_jobs(self, now): timestamp = datetime_to_utc_timestamp(now) return self._get_jobs({'next_run_time': {'$lte': timestamp}}) def add_job(self, job): try: self.collection.insert({ '_id': job.id, 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) }) except DuplicateKeyError: raise ConflictingIdError(job.id)
就先这样吧,. 到此为止我们差不多得知apscheduler的scheduler和job的设计,下次有时间再来分析下apscheduler的其他组件源码。
END.