源码分析Tornado PeriodicCallback Crontab定时任务实现

记得写过一篇文章介绍了常见的定时任务实现方法,有sched,APScheduler, tornado等. 有兴趣的朋友可以找找. 

文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新。

http://xiaorui.cc/2016/01/20/%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90tornado-periodiccallback-crontab%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1%E5%AE%9E%E7%8E%B0/

首先我们使用tornado ioloop的PeriodicCallback实现计划任务. 下面是例子代码.

#blog: xiaorui.cc

from tornado import web, ioloop
import datetime

period = 5 * 1000   # every 5 s
class MainHandler(web.RequestHandler):
    def get(self):
        self.write('Hello xiaorui.cc')

def like_cron():
    print datetime.datetime.now()

def xiaorui():
    print 'call xiaorui.cc 2s'

def lee():
    print 'call lee 3s'
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
if __name__ == '__main__':
    application = web.Application([
        (r'/', MainHandler),
        ])
    application.listen(8081)
    ioloop.PeriodicCallback(like_cron, period).start()  # start scheduler
    ioloop.PeriodicCallback(lee, 3000).start()  # 这里的时间是毫秒
    ioloop.IOLoop.instance().start()

我们来瞅瞅ioloop的PeriodicCallback源码 , PeriodicCallback构造函数可以接收三个参数,一个是回调的函数,一个是间隔的时间,还有一个是io_loop, 一般io_loop为None就可以了,后面会调用当前的IO_LOOP.

PeriodicCallback的源码足够的简单,就是传递回调函数及间隔时间后,调用start()来触发该任务. 当运行到self._schedule_next()的时候,
该函数会判断任务是在running状态,接着会计算下次任务的运行时间,接着会 self.io_loop.add_timeout(self._next_timeout, self._run) ,在io_loop里添加了等待会事件及回调函数,这里的函数是self._run, 当事件触发后会运行_run()函数.
在_run函数里面我们传递的回调函数会被执行的。 那怎么让这任务不停的跑,而不是一遍就放弃了. _run()的最后一行self._schedule_next(),再次递归的调用_schedule_next()。 这就产生了任务的循环。

我们想暂停任务只要把self._running = False 就可以了,另外需要在ioloop里面把该timeout事件取消掉.

#blog: xiaorui.cc

class PeriodicCallback(object):
    """Schedules the given callback to be called periodically.

    The callback is called every ``callback_time`` milliseconds.

    `start` must be called after the `PeriodicCallback` is created.
    """
     #回调函数,间隔时间
    def __init__(self, callback, callback_time, io_loop=None):
        self.callback = callback
        if callback_time <= 0:
            raise ValueError("Periodic callback must have a positive callback_time")
        self.callback_time = callback_time
        self.io_loop = io_loop or IOLoop.current()
        self._running = False
        self._timeout = None
   # 任务开始
    def start(self):
        """Starts the timer."""
        self._running = True
        self._next_timeout = self.io_loop.time()
        self._schedule_next()
   #任务关闭, 把_running改成False,并且在io_loop里去掉事件.
    def stop(self):
        """Stops the timer."""
        self._running = False
        if self._timeout is not None:
            self.io_loop.remove_timeout(self._timeout)
            self._timeout = None

    #执行回调函数.
    def _run(self):
        if not self._running:
            return
        try:
            return self.callback()
        except Exception:
            self.io_loop.handle_callback_exception(self.callback)
        finally:
            self._schedule_next()

     #计算下次的执行时间,并往ioloop里添加timeout事件.
    def _schedule_next(self):
        if self._running:
            current_time = self.io_loop.time()
            while self._next_timeout <= current_time:
            #这里把毫秒改成秒,有没有蛋疼的感觉。 一开始传递个秒不就行了么? 
                self._next_timeout += self.callback_time / 1000.0
            self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)

如果你想在Tornado PeriodicCallback实现crontab的格式,怎么办?  通过上面的源码解析我们可以直接改源码,只需要在_schedule_next里加入crontab解析就可以了,但话说回来要直接改掉源码是不优美的. 

在github中看到一老外的tornado crontab的代码,源码有些废话不适合理解,下面是我精简后的伪代码. 

tornado crontab有两块代码,一个是crontab装饰器,另一个是CronTabCallback类. 

首先我们会通过def crontab装饰器把任务丢给CronTabCallback类。

那么CronTabCallback又是做啥的?CronTabCallback会继承ioloop的PeriodicCallback类。 _calc_callbacktime 这个函数是用来解析下次的任务执行时间,我们会发现他是用Crontab来解析时间的。 
另外CronTabCallback也存在_run及_schedule_next函数.CronTabCallback里的_run其实只是做了个日志记录功能,完全可以去掉.  _schedule_next是由父类的_run调用的. 

那么整个流程就通了,创建任务的时候是使用super初始化父类, 我们知道PeriodicCallback会不停的调用_schedule_next,而感觉python Mro类继承中方法的搜索顺序,会首先调用之类的_schedule_next.    

源码, https://github.com/gaujin/tornado-crontab/blob/master/tornado_crontab/_crontab.py

from crontab import CronTab
from tornado.ioloop import PeriodicCallback

#Blog:  xiaorui.cc  class CronTabCallback(PeriodicCallback):

    def __init__(self, callback, schedule, io_loop=None):
        self.__crontab = CronTab(schedule)
        super(CronTabCallback, self).__init__(
            callback, self._calc_callbacktime(), io_loop)
        self.pid = os.getpid()
        self.user = (os.environ.get("USERNAME")
                     if os.name == "nt" else os.getlogin())

    def _calc_callbacktime(self, now=None):
        return math.ceil(self.__crontab.next(now)) * 1000.0

    def _logging(self, level):

        if self._running and log_crontab.isEnabledFor(level):

            _func, _args, _kwargs = self._get_func_spec()

            log_crontab.log(level,
                            FORMAT_LOG_CRONTAB % dict(pid=self.pid,
                                                      user=self.user,
                                                      funcname=_func.__name__,
                                                      args=_args,
                                                      kwargs=_kwargs))
    def _run(self):

        self._logging(logging.INFO)

        try:
            PeriodicCallback._run(self)

        finally:

            self._logging(logging.DEBUG)

    def _schedule_next(self):
        self.callback_time = self._calc_callbacktime()
        super(CronTabCallback, self)._schedule_next()


def crontab(schedule, io_loop=None):

    def receive_func(func):

        @functools.wraps(func)
        def wrapper(*args, **kwargs):

            _func = functools.partial(func, *args, **kwargs)
            CronTabCallback(_func, schedule, io_loop).start()

        return wrapper
    return receive_func

Tornado PeriodicCallback每次_run触发任务时是阻塞行为的,所以应尽量避免阻塞的行为.    

我写这篇文章是为了分享tornado的一些实现,其实我自己不推荐把一些不相关的任务放在tornado里运行的. 要知道阻塞的计划任务会影响tornado性能.

END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注