记得写过一篇文章介绍了常见的定时任务实现方法,有sched,APScheduler, tornado等. 有兴趣的朋友可以找找.
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。
首先我们使用tornado ioloop的PeriodicCallback实现计划任务. 下面是例子代码.
#blog: xiaorui.cc from tornado import web, ioloop import datetime period = 5 * 1000 # every 5 s class MainHandler(web.RequestHandler): def get(self): self.write('Hello xiaorui.cc') def like_cron(): print datetime.datetime.now() def xiaorui(): print 'call xiaorui.cc 2s' def lee(): print 'call lee 3s' if __name__ == '__main__': application = web.Application([ (r'/', MainHandler), ]) application.listen(8081) ioloop.PeriodicCallback(like_cron, period).start() # start scheduler ioloop.PeriodicCallback(lee, 3000).start() # 这里的时间是毫秒 ioloop.IOLoop.instance().start()
我们来瞅瞅ioloop的PeriodicCallback源码 , PeriodicCallback构造函数可以接收三个参数,一个是回调的函数,一个是间隔的时间,还有一个是io_loop, 一般io_loop为None就可以了,后面会调用当前的IO_LOOP.
PeriodicCallback的源码足够的简单,就是传递回调函数及间隔时间后,调用start()来触发该任务. 当运行到self._schedule_next()的时候,
该函数会判断任务是在running状态,接着会计算下次任务的运行时间,接着会 self.io_loop.add_timeout(self._next_timeout, self._run) ,在io_loop里添加了等待会事件及回调函数,这里的函数是self._run, 当事件触发后会运行_run()函数.
在_run函数里面我们传递的回调函数会被执行的。 那怎么让这任务不停的跑,而不是一遍就放弃了. _run()的最后一行self._schedule_next(),再次递归的调用_schedule_next()。 这就产生了任务的循环。
我们想暂停任务只要把self._running = False 就可以了,另外需要在ioloop里面把该timeout事件取消掉.
#blog: xiaorui.cc class PeriodicCallback(object): """Schedules the given callback to be called periodically. The callback is called every ``callback_time`` milliseconds. `start` must be called after the `PeriodicCallback` is created. """ #回调函数,间隔时间 def __init__(self, callback, callback_time, io_loop=None): self.callback = callback if callback_time <= 0: raise ValueError("Periodic callback must have a positive callback_time") self.callback_time = callback_time self.io_loop = io_loop or IOLoop.current() self._running = False self._timeout = None # 任务开始 def start(self): """Starts the timer.""" self._running = True self._next_timeout = self.io_loop.time() self._schedule_next() #任务关闭, 把_running改成False,并且在io_loop里去掉事件. def stop(self): """Stops the timer.""" self._running = False if self._timeout is not None: self.io_loop.remove_timeout(self._timeout) self._timeout = None #执行回调函数. def _run(self): if not self._running: return try: return self.callback() except Exception: self.io_loop.handle_callback_exception(self.callback) finally: self._schedule_next() #计算下次的执行时间,并往ioloop里添加timeout事件. def _schedule_next(self): if self._running: current_time = self.io_loop.time() while self._next_timeout <= current_time: #这里把毫秒改成秒,有没有蛋疼的感觉。 一开始传递个秒不就行了么? self._next_timeout += self.callback_time / 1000.0 self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
如果你想在Tornado PeriodicCallback实现crontab的格式,怎么办? 通过上面的源码解析我们可以直接改源码,只需要在_schedule_next里加入crontab解析就可以了,但话说回来要直接改掉源码是不优美的.
在github中看到一老外的tornado crontab的代码,源码有些废话不适合理解,下面是我精简后的伪代码.
tornado crontab有两块代码,一个是crontab装饰器,另一个是CronTabCallback类.
首先我们会通过def crontab装饰器把任务丢给CronTabCallback类。
那么CronTabCallback又是做啥的?CronTabCallback会继承ioloop的PeriodicCallback类。 _calc_callbacktime 这个函数是用来解析下次的任务执行时间,我们会发现他是用Crontab来解析时间的。
另外CronTabCallback也存在_run及_schedule_next函数.CronTabCallback里的_run其实只是做了个日志记录功能,完全可以去掉. _schedule_next是由父类的_run调用的.
那么整个流程就通了,创建任务的时候是使用super初始化父类, 我们知道PeriodicCallback会不停的调用_schedule_next,而感觉python Mro类继承中方法的搜索顺序,会首先调用之类的_schedule_next.
源码, https://github.com/gaujin/tornado-crontab/blob/master/tornado_crontab/_crontab.py
from crontab import CronTab from tornado.ioloop import PeriodicCallback #Blog: xiaorui.cc class CronTabCallback(PeriodicCallback): def __init__(self, callback, schedule, io_loop=None): self.__crontab = CronTab(schedule) super(CronTabCallback, self).__init__( callback, self._calc_callbacktime(), io_loop) self.pid = os.getpid() self.user = (os.environ.get("USERNAME") if os.name == "nt" else os.getlogin()) def _calc_callbacktime(self, now=None): return math.ceil(self.__crontab.next(now)) * 1000.0 def _logging(self, level): if self._running and log_crontab.isEnabledFor(level): _func, _args, _kwargs = self._get_func_spec() log_crontab.log(level, FORMAT_LOG_CRONTAB % dict(pid=self.pid, user=self.user, funcname=_func.__name__, args=_args, kwargs=_kwargs)) def _run(self): self._logging(logging.INFO) try: PeriodicCallback._run(self) finally: self._logging(logging.DEBUG) def _schedule_next(self): self.callback_time = self._calc_callbacktime() super(CronTabCallback, self)._schedule_next() def crontab(schedule, io_loop=None): def receive_func(func): @functools.wraps(func) def wrapper(*args, **kwargs): _func = functools.partial(func, *args, **kwargs) CronTabCallback(_func, schedule, io_loop).start() return wrapper return receive_func
Tornado PeriodicCallback每次_run触发任务时是阻塞行为的,所以应尽量避免阻塞的行为.
我写这篇文章是为了分享tornado的一些实现,其实我自己不推荐把一些不相关的任务放在tornado里运行的. 要知道阻塞的计划任务会影响tornado性能.
END.