使用python多线程threading的timer实现任务定时器

跟同事聊起了delayqueue的实现delayqueue是啥? 听着名字就知道是延迟队列,也可以理解为是基于时间的定时任务队列. 

delayqueue又有什么用处? 
比如你维护了一个连接池,但是因为各种原因,你需要把这一堆的链接超过5分钟的长连接给干掉,重新创建个新的,然后塞入队列里面…
比如你维护了cache server,你又不想去被动更新缓存,而是想主动更新cache的内容,这时候,你怎么知道哪些缓存的key过期了,需要重新从hbase里面scan,然后塞入到cache里面。 

delayqueue要实现的东西其实挺多的,咱们可以变相的实现一个定时器 timer…  也就是让系统定时的去执行某个任务… … 

文章的原文地址,http://xiaorui.cc/?p=1721
先说一个坑… 在stackoverflow看到一个运行的demo,啥都没想就跑了….. 

import time
import threading

def hello(s):
    print s

key = “xiaorui.cc”

for i in range(3):
    t = threading.Timer(3.0, hello(key,))
    t.start()

Exception in thread Thread-3:
Traceback (most recent call last):
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
    self.run()
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
    self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable

Exception in thread Thread-1:
Traceback (most recent call last):
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
    self.run()
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
    self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable

Exception in thread Thread-2:
Traceback (most recent call last):
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
    self.run()
  File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
    self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable

这是能跑起来的代码….   还是看threading的timer源码才发现的.

import time
import threading

def hello(s):
    print s

key = "xiaorui.cc"

for i in range(3):
    t = threading.Timer(3.0, hello,[key])
    t.start()

[ruifengyun@devops delay ]$ python b.py
xiaorui.cc
xiaorui.cc
xiaorui.cc

下面是Timer的源码实现,实现的方式很简单,收到一个任务后,我立即new一个线程,线程逻辑里面最前面插入sleep…  等你时间消耗没了,你需要运行的函数自然而然也就执行了。 

def Timer(*args, **kwargs):
    """Factory function to create a Timer object.

    Timers call a function after a specified number of seconds:

        t = Timer(30.0, f, args=[], kwargs={})
        t.start()
        t.cancel()     # stop the timer's action if it's still waiting

    """
    return _Timer(*args, **kwargs)

class _Timer(Thread):
    """Call a function after a specified number of seconds:

            t = Timer(30.0, f, args=[], kwargs={})
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

    """

    def __init__(self, interval, function, args=[], kwargs={}):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()

我这边又简单加了一层装饰器,这样能方便的让实时的任务改成定时器去执行…   先前说了,python threading timer的实现有些丑陋.. 应该说是太简单了
每个任务都new一个线程,线程一多,cpu的上下文切换也是个成本…   切记,如果你的任务太多,一定更不要用Timer…. 

from threading import Timer

def delayed(seconds):
    def decorator(f):
        def wrapper(*args, **kargs):
            t = Timer(seconds, f, args, kargs)
            t.start()
        return wrapper
    return decorator

@delayed(3)
def xiaorui():
    print "xiaorui.cc"

for i in range(10):
    xiaorui()

总结:
    python这个timer定时器太简单了….  也就可以简单的一用…


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注