跟同事聊起了delayqueue的实现delayqueue是啥? 听着名字就知道是延迟队列,也可以理解为是基于时间的定时任务队列.
delayqueue又有什么用处?
比如你维护了一个连接池,但是因为各种原因,你需要把这一堆的链接超过5分钟的长连接给干掉,重新创建个新的,然后塞入队列里面…
比如你维护了cache server,你又不想去被动更新缓存,而是想主动更新cache的内容,这时候,你怎么知道哪些缓存的key过期了,需要重新从hbase里面scan,然后塞入到cache里面。
delayqueue要实现的东西其实挺多的,咱们可以变相的实现一个定时器 timer… 也就是让系统定时的去执行某个任务… …
文章的原文地址,http://xiaorui.cc/?p=1721
先说一个坑… 在stackoverflow看到一个运行的demo,啥都没想就跑了…..
import time
import threading
def hello(s):
print s
key = “xiaorui.cc”
for i in range(3):
t = threading.Timer(3.0, hello(key,))
t.start()
Exception in thread Thread-3:
Traceback (most recent call last):
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
self.run()
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable
Exception in thread Thread-1:
Traceback (most recent call last):
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
self.run()
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable
Exception in thread Thread-2:
Traceback (most recent call last):
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 810, in __bootstrap_inner
self.run()
File “/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py”, line 1082, in run
self.function(*self.args, **self.kwargs)
TypeError: ‘NoneType’ object is not callable
这是能跑起来的代码…. 还是看threading的timer源码才发现的.
import time
import threading
def hello(s):
print s
key = "xiaorui.cc"
for i in range(3):
t = threading.Timer(3.0, hello,[key])
t.start()
[ruifengyun@devops delay ]$ python b.py
xiaorui.cc
xiaorui.cc
xiaorui.cc
下面是Timer的源码实现,实现的方式很简单,收到一个任务后,我立即new一个线程,线程逻辑里面最前面插入sleep… 等你时间消耗没了,你需要运行的函数自然而然也就执行了。
def Timer(*args, **kwargs):
"""Factory function to create a Timer object.
Timers call a function after a specified number of seconds:
t = Timer(30.0, f, args=[], kwargs={})
t.start()
t.cancel() # stop the timer's action if it's still waiting
"""
return _Timer(*args, **kwargs)
class _Timer(Thread):
"""Call a function after a specified number of seconds:
t = Timer(30.0, f, args=[], kwargs={})
t.start()
t.cancel() # stop the timer's action if it's still waiting
"""
def __init__(self, interval, function, args=[], kwargs={}):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.finished = Event()
def cancel(self):
"""Stop the timer if it hasn't finished yet"""
self.finished.set()
def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()
我这边又简单加了一层装饰器,这样能方便的让实时的任务改成定时器去执行… 先前说了,python threading timer的实现有些丑陋.. 应该说是太简单了
每个任务都new一个线程,线程一多,cpu的上下文切换也是个成本… 切记,如果你的任务太多,一定更不要用Timer….
from threading import Timer
def delayed(seconds):
def decorator(f):
def wrapper(*args, **kargs):
t = Timer(seconds, f, args, kargs)
t.start()
return wrapper
return decorator
@delayed(3)
def xiaorui():
print "xiaorui.cc"
for i in range(10):
xiaorui()
总结:
python这个timer定时器太简单了…. 也就可以简单的一用…
