记得写过一篇文章介绍了常见的定时任务实现方法,有sched,APScheduler, tornado等. 有兴趣的朋友可以找找.
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。
首先我们使用tornado ioloop的PeriodicCallback实现计划任务. 下面是例子代码.
#blog: xiaorui.cc
from tornado import web, ioloop
import datetime
period = 5 * 1000 # every 5 s
class MainHandler(web.RequestHandler):
def get(self):
self.write('Hello xiaorui.cc')
def like_cron():
print datetime.datetime.now()
def xiaorui():
print 'call xiaorui.cc 2s'
def lee():
print 'call lee 3s'
if __name__ == '__main__':
application = web.Application([
(r'/', MainHandler),
])
application.listen(8081)
ioloop.PeriodicCallback(like_cron, period).start() # start scheduler
ioloop.PeriodicCallback(lee, 3000).start() # 这里的时间是毫秒
ioloop.IOLoop.instance().start()
我们来瞅瞅ioloop的PeriodicCallback源码 , PeriodicCallback构造函数可以接收三个参数,一个是回调的函数,一个是间隔的时间,还有一个是io_loop, 一般io_loop为None就可以了,后面会调用当前的IO_LOOP.
PeriodicCallback的源码足够的简单,就是传递回调函数及间隔时间后,调用start()来触发该任务. 当运行到self._schedule_next()的时候,
该函数会判断任务是在running状态,接着会计算下次任务的运行时间,接着会 self.io_loop.add_timeout(self._next_timeout, self._run) ,在io_loop里添加了等待会事件及回调函数,这里的函数是self._run, 当事件触发后会运行_run()函数.
在_run函数里面我们传递的回调函数会被执行的。 那怎么让这任务不停的跑,而不是一遍就放弃了. _run()的最后一行self._schedule_next(),再次递归的调用_schedule_next()。 这就产生了任务的循环。
我们想暂停任务只要把self._running = False 就可以了,另外需要在ioloop里面把该timeout事件取消掉.
#blog: xiaorui.cc
class PeriodicCallback(object):
"""Schedules the given callback to be called periodically.
The callback is called every ``callback_time`` milliseconds.
`start` must be called after the `PeriodicCallback` is created.
"""
#回调函数,间隔时间
def __init__(self, callback, callback_time, io_loop=None):
self.callback = callback
if callback_time <= 0:
raise ValueError("Periodic callback must have a positive callback_time")
self.callback_time = callback_time
self.io_loop = io_loop or IOLoop.current()
self._running = False
self._timeout = None
# 任务开始
def start(self):
"""Starts the timer."""
self._running = True
self._next_timeout = self.io_loop.time()
self._schedule_next()
#任务关闭, 把_running改成False,并且在io_loop里去掉事件.
def stop(self):
"""Stops the timer."""
self._running = False
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
#执行回调函数.
def _run(self):
if not self._running:
return
try:
return self.callback()
except Exception:
self.io_loop.handle_callback_exception(self.callback)
finally:
self._schedule_next()
#计算下次的执行时间,并往ioloop里添加timeout事件.
def _schedule_next(self):
if self._running:
current_time = self.io_loop.time()
while self._next_timeout <= current_time:
#这里把毫秒改成秒,有没有蛋疼的感觉。 一开始传递个秒不就行了么?
self._next_timeout += self.callback_time / 1000.0
self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
如果你想在Tornado PeriodicCallback实现crontab的格式,怎么办? 通过上面的源码解析我们可以直接改源码,只需要在_schedule_next里加入crontab解析就可以了,但话说回来要直接改掉源码是不优美的.
在github中看到一老外的tornado crontab的代码,源码有些废话不适合理解,下面是我精简后的伪代码.
tornado crontab有两块代码,一个是crontab装饰器,另一个是CronTabCallback类.
首先我们会通过def crontab装饰器把任务丢给CronTabCallback类。
那么CronTabCallback又是做啥的?CronTabCallback会继承ioloop的PeriodicCallback类。 _calc_callbacktime 这个函数是用来解析下次的任务执行时间,我们会发现他是用Crontab来解析时间的。
另外CronTabCallback也存在_run及_schedule_next函数.CronTabCallback里的_run其实只是做了个日志记录功能,完全可以去掉. _schedule_next是由父类的_run调用的.
那么整个流程就通了,创建任务的时候是使用super初始化父类, 我们知道PeriodicCallback会不停的调用_schedule_next,而感觉python Mro类继承中方法的搜索顺序,会首先调用之类的_schedule_next.
源码, https://github.com/gaujin/tornado-crontab/blob/master/tornado_crontab/_crontab.py
from crontab import CronTab
from tornado.ioloop import PeriodicCallback
#Blog: xiaorui.cc class CronTabCallback(PeriodicCallback):
def __init__(self, callback, schedule, io_loop=None):
self.__crontab = CronTab(schedule)
super(CronTabCallback, self).__init__(
callback, self._calc_callbacktime(), io_loop)
self.pid = os.getpid()
self.user = (os.environ.get("USERNAME")
if os.name == "nt" else os.getlogin())
def _calc_callbacktime(self, now=None):
return math.ceil(self.__crontab.next(now)) * 1000.0
def _logging(self, level):
if self._running and log_crontab.isEnabledFor(level):
_func, _args, _kwargs = self._get_func_spec()
log_crontab.log(level,
FORMAT_LOG_CRONTAB % dict(pid=self.pid,
user=self.user,
funcname=_func.__name__,
args=_args,
kwargs=_kwargs))
def _run(self):
self._logging(logging.INFO)
try:
PeriodicCallback._run(self)
finally:
self._logging(logging.DEBUG)
def _schedule_next(self):
self.callback_time = self._calc_callbacktime()
super(CronTabCallback, self)._schedule_next()
def crontab(schedule, io_loop=None):
def receive_func(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
_func = functools.partial(func, *args, **kwargs)
CronTabCallback(_func, schedule, io_loop).start()
return wrapper
return receive_func
Tornado PeriodicCallback每次_run触发任务时是阻塞行为的,所以应尽量避免阻塞的行为.
我写这篇文章是为了分享tornado的一些实现,其实我自己不推荐把一些不相关的任务放在tornado里运行的. 要知道阻塞的计划任务会影响tornado性能.
END.
