聊聊threading的condition条件变量

     Python下有很多的Lock锁,比如Mutex,Rlock,semaphore…  这些都是比较常用的Lock锁。  然而很多时候我们都忘记threading下还有一个叫做condition的条件变量。   condition内部是含有锁的逻辑,不然也没法保证线程之间的同步。 


该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新.

http://xiaorui.cc/?p=3419

那么condition一般用于什么场景?  最多的场景是什么?

线程A需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执行。在pthread库中通过条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。

通俗的讲,生产者,消费者的模型。 condition很适合那种主动休眠,被动唤醒的场景。 condition使用难度要高于mutex,一不注意就会被死锁,SO一定要理解condition实现后,再用。

     首先我们知道python下的线程是真实的线程,底层用的是pthread。pthread内部Condition条件变量有两个关键函数, await和signal方法,对应python threading Condition是wait和notify方法。 

     一个Condition实例的内部实际上维护了两个队列,一个是等待锁队列,mutex内部其实就是维护了一个队列。 另一个队列可以叫等待条件队列,在这队列中的节点都是由于(某些条件不满足而)线程自身调用wait方法阻塞的线程,记住是自身阻塞。最重要的Condition方法是wait和 notify方法。另外condition还需要lock的支持, 如果你构造函数没有指定lock,condition会默认给你配一个rlock。   
下面是这两个方法的执行流程。
          await方法:
                            1. 入列到条件队列(注意这里不是等待锁的队列)
                            2. 释放锁
                            3. 阻塞自身线程
                             ————被唤醒后执行————-
                            4. 尝试去获取锁(执行到这里时线程已不在条件队列中,而是位于等待(锁的)队列中,参见signal方法)
                                4.1 成功,从await方法中返回,执行线程后面的代码
                                4.2 失败,阻塞自己(等待前一个节点释放锁时将它唤醒)
         注意: 调用wait可以让当前线程休眠,等待其他线程的唤醒,也就是等待signal,这个过程是阻塞的。 当队列首线程被唤醒后,会继续执行await方法中后面的代码。

         signal (notify)方法:

                           1. 将条件队列的队首节点取出,放入等待锁队列的队尾
                           2. 唤醒节点对应的线程.

注: signal ( notify ) 可以把wait队列的那些线程给唤醒起来。  


下面是一个python测试代码.

#xiaorui.cc
!/usr/bin/env python
 -*- coding:utf-8 -*-
mport threading
mport time

ask = []

lass Producer(threading.Thread):
   def run(self):
       while True:
           if con.acquire():
               print "producer role ---------------------"
               if len(task) > 1000:
                   con.wait()
               else:
                   for i in range(100):
                       task.append(i)
                   msg = self.name+' produce 100, count=' + str(len(task))
                   print msg
                   con.notify()
                   time.sleep(5)
                   con.release()

lass Consumer(threading.Thread):
   def run(self):
       while True:
           if con.acquire():
               print "consumer role"
               if len(task) < 100:
                   con.wait()
               else:
                   for i in range(10):
                       task.pop()
                   msg = self.name+' consume 3, count='+str(len(task))
                   print msg
                   con.notify()
                time.sleep(3)
                con.release()

con = threading.Condition()

def test():
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()
if __name__ == '__main__':
    test()



END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注