使用PriorityQueue和heapq实现基于时间戳的时序优先级队列

最近在做一个基于业务的监控系统,当然还是python,他区别于基础监控和那种探测监控。 这次的stormMonitor更多的是scan扫描库和实时的统计…  

scan扫描库是,定时的统计计算上个小时的各种数据,不管是从mysql或者是hbase….  

实时统计是,线上的几大核心模块自主发出metric信息到mq里面,然后我这边会实时计算..   原本是计划是用kafka和storm,这种标配的流式计算组合别的部分在用,但我们的量不是很大,还有就是实时性不是很高,每个小时就可以了…. 其实最主要的是,storm的python接口不是很友善,最少我看了半天有些一头雾水,居然有催眠的感觉…. 倒是kafka有些熟悉….. 放弃了….. 所以直接加层中间件往redis里面做incr +1 ….   


那么问题来了,这根我今天说的用python事先基于时间戳的优先级队列又有毛关系呐?  毛关系呐?其实对于这次的title定义的有些纠结….  时序队列是啥?    优先级队列又是啥?    

我不知道有人用过redis sorted set没,他其实就是做成时序队列, 我们可以插入时间戳做key,然后把任务做成value…   为什么不直接把扔到redis list队列里面。 然后有个后端的daemon后端程序一直pop任务….  但是你有没有想过像rq,celery这样的任务队列,他是如何搞定的定时执行任务….  比如还有我以前用过的场景,在开发大型监控系统的时候,尤其在数据图表展现的时候,什么会造成你的图表不及时出来?  很有可能是你的数据聚合没有做好….   一般来说聚合就是用时序队列做的….    因为你要一直监听时序队列,看看有没有要计算的任务….  而往往这些聚会任务都是1分钟,5分钟,15分钟的执行…  在乐视的时候,往往有2w+台server要监控,再加上一堆的openstack的虚拟机,然后每个主机最少有10个监控项目,那么就是5w个监控项目,然后每个监控项目都要做聚合,那就是15w*3  ,那就是45w个时间点,每个时间点就是一个数据聚合计算,可以求平均值,max,min….     这就是时序队列的场景.    额外说下,如果你的任务过多,就像我刚才描述的监控场景….   那么我还是建议你用redis,扩展成分布式,如果单redis不搞用,那么你可以用一致性hash切开任务….

说明下文章的原文出处…  http://xiaorui.cc/?p=1707

http://xiaorui.cc/2015/07/01/python%E4%BD%BF%E7%94%A8priorityqueue%E5%92%8Cheapq%E5%AE%9E%E7%8E%B0%E5%9F%BA%E4%BA%8E%E6%97%B6%E9%97%B4%E6%88%B3%E7%9A%84%E6%97%B6%E5%BA%8F%E4%BC%98%E5%85%88%E7%BA%A7%E9%98%9F%E5%88%97/

终于把关于时序队列的事扯完了….. 优先级又是什么?  我们往往可以把一堆的任务通过他的热度,或者是紧急情况进行top分层…..    python rq任务队列的优先级实现的方式,我上次有讲过…    他实现的方式微微有些取巧..   是定义了三个队列,high,middle,low    一看名字就知道他们各自的优先级等级… 

那么rq客户端取任务的时候,是如何取得? 他用了for循环,先pop 高队列,没有的话,再去pop middle queue….  然后,你就懂了…..     今天说了这么多废话,其实就是想我们自己搞定他的是顺序,而不是去读三次….    


from Queue import PriorityQueue
import time
q = PriorityQueue()
for i in xrange(10):
    q.put((int(time.time()), 'xx'))
    time.sleep(1)
while  not q.empty():
    print q.get()

(1435764197, ‘xx’)
(1435764198, ‘xx’)
(1435764199, ‘xx’)
(1435764200, ‘xx’)
(1435764201, ‘xx’)
(1435764202, ‘xx’)
(1435764203, ‘xx’)
(1435764204, ‘xx’)
(1435764205, ‘xx’)
(1435764206, ‘xx’)

我们来看下Priorityqueue的实现

class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self, len=len):
        return len(self.queue)

    def _put(self, item, heappush=heapq.heappush):
        heappush(self.queue, item)

    def _get(self, heappop=heapq.heappop):
        return heappop(self.queue)

下面是相关的函数…  自己看吧….  我的场景就那些..

Methods defined here:  

__add__(self, x)
Overload syntax: pq = pq + x is the same as pq.add(x)
__bool__(self)
Determine the truth of a priority queue: non-empty priority queues are True
__init__(self, inital_contents=[], key=<function <lambda>>, reverse=False)
Priority queue is constructed to store initial_contents (it can be any iterable),
  with key/reverse used for computing relative priorities (as in sorting).
__iter__(self)
Implement the constructor for the iterator protocol
__next__(self)
Implement next for the iterator protocol
__str__(self)
Return a string representation of a priority queue
add(self, v)
Add value v to the priority queue
clear(self)
Clear the priority queue
is_empty(self)
Return whether the priority queue is empty
merge(self, other)
Merge other (a PriorityQueue) with this one (clearing otherPriorityQueue)
peek(self)
Return (but do not remove) the highest priority value in the priority queue
remove(self)
Remove and return the highest priority value (by key/reverse) in the priority queue
size(self)
Return the number of values in the priority queue

还有就是通过python queue的PriorityQueue小片代码,我们会发现他是用heapq实现的,主要是用到heapq.heappush和heapq.heappop ,heapq模块是啥? 他实现了一个适用于Python列表的最小堆排序算法 。是一个最小堆,堆顶元素 a[0] 永远是最小的. 和 Java 中的优先队列类似.


heapq模块提供了如下几个函数:

#xiaorui.cc

heapq.heappush(heap, item) 把item添加到heap中(heap是一个列表)
heapq.heappop(heap) 把堆顶元素弹出,返回的就是堆顶
heapq.heappushpop(heap, item) 先把item加入到堆中,然后再pop,比heappush()再heappop()要快得多
heapq.heapreplace(heap, item) 先pop,然后再把item加入到堆中,比heappop()再heappush()要快得多
heapq.heapify(x) 将列表x进行堆调整,默认的是小顶堆
heapq.merge(*iterables) 将多个列表合并,并进行堆调整,返回的是合并后的列表的迭代器
heapq.nlargest(n, iterable, key=None) 返回最大的n个元素(Top-K问题)
heapq.nsmallest(n, iterable, key=None) 返回最小的n个元素(Top-K问题)

总结下:

        没啥好总结的,不知道为毛,博客里面光优先级队列的文章就好几篇….  自从来了新公司后,发现关心业务的东西没以前多了,工作的核心更多的是偏向于调度和性能上了.. 尤其是在分布式的场景下…. 

马上要7.2号了,祝好…..    xiaorui.cc





大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注