前言:
跟大家在群里聊了关于apscheduler的一些话题,大家尤其对他的时间管理感兴趣, 趁现在有空,花点时间做个分享。 其实在以前是写过apscheduler的源代码分析文章,写得太泛了,这次主要着重于定时器的相关逻辑。 python下的定时任务框架有那么几个选择,像sched,celery,apscheduler都可以做,相对来说功能和扩展最好的当属apscheduler了。 话说,我曾经还给apscheduler提交过一个crontab 格式的扩展,最后没merge,原因不明….
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新. http://xiaorui.cc/?p=4228
首先从一个测试说起,代码如下:
# xiaorui.cc
# coding:utf-8
from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
def new():
print('this is xiaorui.cc time %s' % datetime.now())
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'interval', seconds=11) # 第一个任务
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.add_job(new, 'interval', seconds=1) # 第二个任务
while True:
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
执行的结果确实是我们想看到的,很理想 !
那么我们应该去思考下,Apscheduler是如何管理这些定时器的 ? 我曾经写过各种各样的定时器, 用redis zset实现定时器,用gevent实现过, 用小堆实现过, 用epoll timerfd也实现过… 有兴趣的朋友可以看看我的github和博文.
追了一下代码发现 apscheduler 默认是没有采用这些方法, 而使用了一个更加简单的方法.
# xiaorui.cc
class BackgroundScheduler(BlockingScheduler):
_thread = None
def start(self, *args, **kwargs):
self._event = Event()
BaseScheduler.start(self, *args, **kwargs)
self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.daemon = self._daemon
self._thread.start()
class BlockingScheduler(BaseScheduler):
_event = None
def _main_loop(self):
wait_seconds = TIMEOUT_MAX
while self.state != STATE_STOPPED:
self._event.wait(wait_seconds) # 等待时间, 这个线程专门用来管理定时器时间的.
self._event.clear()
wait_seconds = self._process_jobs()
def wakeup(self):
self._event.set() # 唤醒,避免过长的休眠
def _process_jobs(self):
if self.state == STATE_PAUSED:
...
now = datetime.now(self.timezone) # 当前时间,这个很重要 !
next_wakeup_time = None
events = []
with self._jobstores_lock:
for jobstore_alias, jobstore in six.iteritems(self._jobstores): # 把当前任务给拿出来.
try:
due_jobs = jobstore.get_due_jobs(now) # 计算是否到了执行时间
except Exception as e:
# Schedule a wakeup at least in jobstore_retry_interval seconds # 默认是 小于 10s 的
self._logger.warning('Error getting due jobs from job store %r: %s',
jobstore_alias, e)
retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval) # self.jobstore_retry_interval = float(config.pop('jobstore_retry_interval', 10))
if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
next_wakeup_time = retry_wakeup_time
continue
for job in due_jobs:
... ...
run_times = job._get_run_times(now)
run_times = run_times[-1:] if run_times and job.coalesce else run_times
if run_times:
try:
executor.submit_job(job, run_times) # 提交这个任务
... ...
jobstore_next_run_time = jobstore.get_next_run_time() #获取下次执行的时间
if jobstore_next_run_time and (next_wakeup_time is None or jobstore_next_run_time < next_wakeup_time):
next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone) # 判断计算最近的任务时间.
# Dispatch collected events
for event in events:
self._dispatch_event(event)
if self.state == STATE_PAUSED: # 如果被暂停了,那么当然就不用wait了
wait_seconds = None
elif next_wakeup_time is None:
wait_seconds = None
else:
wait_seconds = max(timedelta_seconds(next_wakeup_time - now), 0) # 当然不能是0秒
self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
wait_seconds)
return wait_seconds
在这里我们知道他会wait这个事件,超时的时间为所有定时器中最小的那个。。。 举个例子现在是1秒, 有三个任务分别是10秒、15秒、20秒后执行? 那么我们就再等待 10 – 1 秒。
那么这里会有一个效率的问题 ! 当job列表任务很多的时候,难道每次都O(N)的时间复杂度计算下次event等待时间?
当然不可能这么粗暴 !
Apscheduler 使用jobstore为memery模式时候,会构建一个列表来用存放任务,当我们 add_job 添加一个新任务的时候,这个列表按时间排序存放,可以理解为有序列表,查找存放的点当然是二分查找算法了, 他的时间复杂度是 O(logN) . 这样实现的效果跟我前面说的小堆是一样的.
def add_job(self, job):
if job.id in self._jobs_index:
raise ConflictingIdError(job.id)
timestamp = datetime_to_utc_timestamp(job.next_run_time)
index = self._get_job_index(timestamp, job.id)
self._jobs.insert(index, (job, timestamp))
self._jobs_index[job.id] = (job, timestamp)
def _get_job_index(self, timestamp, job_id):
lo, hi = 0, len(self._jobs)
timestamp = float('inf') if timestamp is None else timestamp
while lo < hi:
mid = (lo + hi) // 2
mid_job, mid_timestamp = self._jobs[mid]
mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp
if mid_timestamp > timestamp:
hi = mid
elif mid_timestamp < timestamp:
lo = mid + 1
elif mid_job.id > job_id:
hi = mid
elif mid_job.id < job_id:
lo = mid + 1
else:
return mid
return lo
如果 apscheduler的定时任务已经跑起来了, 这时候你新加了一个定时周期任务? 但是你这边还在wait? 不要担心,当你调用add_job时, _real_add_job会帮你唤醒的. 当被唤醒了后,Apscheduler又继续寻找到期任务及等待时间。
def _real_add_job(self, job, jobstore_alias, replace_existing):
if self.state == STATE_RUNNING:
self.wakeup()
Apscheduler扩展了很多的jobstore存储方案,那么我们挑几个看看他们的实现方法.
file: jobstores/redis.py
def get_due_jobs(self, now):
timestamp = datetime_to_utc_timestamp(now)
job_ids = self.redis.zrangebyscore(self.run_times_key, 0, timestamp)
if job_ids:
job_states = self.redis.hmget(self.jobs_key, *job_ids)
...
def add_job(self, job):
if self.redis.hexists(self.jobs_key, job.id):
raise ConflictingIdError(job.id)
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
self.pickle_protocol))
if job.next_run_time:
pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
pipe.execute()
当然mysql也是依赖时间戳的概念,也是需要有排序的。
def get_due_jobs(self, now):
timestamp = datetime_to_utc_timestamp(now)
return self._get_jobs({'next_run_time': {'$lte': timestamp}})
def add_job(self, job):
try:
self.collection.insert({
'_id': job.id,
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
})
except DuplicateKeyError:
raise ConflictingIdError(job.id)
就先这样吧,. 到此为止我们差不多得知apscheduler的scheduler和job的设计,下次有时间再来分析下apscheduler的其他组件源码。
END.
