跟同事聊起了delayqueue的实现delayqueue是啥? 听着名字就知道是延迟队列,也可以理解为是基于时间的定时任务队列.
delayqueue又有什么用处?
比如你维护了一个连接池,但是因为各种原因,你需要把这一堆的链接超过5分钟的长连接给干掉,重新创建个新的,然后塞入队列里面…
比如你维护了cache server,你又不想去被动更新缓存,而是想主动更新cache的内容,这时候,你怎么知道哪些缓存的key过期了,需要重新从hbase里面scan,然后塞入到cache里面。
delayqueue要实现的东西其实挺多的,咱们可以变相的实现一个定时器 timer… 也就是让系统定时的去执行某个任务… …
文章的原文地址,http://xiaorui.cc/?p=1721
先说一个坑… 在stackoverflow看到一个运行的demo,啥都没想就跑了…..
import time
import threading
def hello(s):
print s
key = “xiaorui.cc”
for i in range(3):
t = threading.Timer(3.0, hello(key,))
t.start()
Exception in thread Thread-3:
Traceback (most recent call last):
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
self.run()
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable
Exception in thread Thread-1:
Traceback (most recent call last):
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
self.run()
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable
Exception in thread Thread-2:
Traceback (most recent call last):
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
self.run()
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable
这是能跑起来的代码…. 还是看threading的timer源码才发现的.
import time import threading def hello(s): print s key = "xiaorui.cc" for i in range(3): t = threading.Timer(3.0, hello,[key]) t.start()
[ruifengyun@devops delay ]$ python b.py
xiaorui.cc
xiaorui.cc
xiaorui.cc
下面是Timer的源码实现,实现的方式很简单,收到一个任务后,我立即new一个线程,线程逻辑里面最前面插入sleep… 等你时间消耗没了,你需要运行的函数自然而然也就执行了。
def Timer(*args, **kwargs): """Factory function to create a Timer object. Timers call a function after a specified number of seconds: t = Timer(30.0, f, args=[], kwargs={}) t.start() t.cancel() # stop the timer's action if it's still waiting """ return _Timer(*args, **kwargs) class _Timer(Thread): """Call a function after a specified number of seconds: t = Timer(30.0, f, args=[], kwargs={}) t.start() t.cancel() # stop the timer's action if it's still waiting """ def __init__(self, interval, function, args=[], kwargs={}): Thread.__init__(self) self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.finished = Event() def cancel(self): """Stop the timer if it hasn't finished yet""" self.finished.set() def run(self): self.finished.wait(self.interval) if not self.finished.is_set(): self.function(*self.args, **self.kwargs) self.finished.set()
我这边又简单加了一层装饰器,这样能方便的让实时的任务改成定时器去执行… 先前说了,python threading timer的实现有些丑陋.. 应该说是太简单了
每个任务都new一个线程,线程一多,cpu的上下文切换也是个成本… 切记,如果你的任务太多,一定更不要用Timer….
from threading import Timer def delayed(seconds): def decorator(f): def wrapper(*args, **kargs): t = Timer(seconds, f, args, kargs) t.start() return wrapper return decorator @delayed(3) def xiaorui(): print "xiaorui.cc" for i in range(10): xiaorui()
总结:
python这个timer定时器太简单了…. 也就可以简单的一用…