扯扯python的多线程的同步锁 Lock RLock Semaphore Event Condition


我想大家都知道python的gil限制,记得刚玩python那会,知道了有pypy和Cpython这样的解释器,当时听说是很猛,也就意味肯定是突破了gil的限制,最后经过多方面测试才知道,还是那德行…. 如果你的应用英语那种cpu密集运算的,p大部分情况都推荐使用多进程。

有些扯远了,我个人很喜欢用gevent这种协程的框架,但是不是所有的模块都可以这种用户态的线程… 不得已会用threading… 常用的模块一般都附带线程安全的问题.. 但是如果你自己的扩展模块,有时候会遇到线程安全,也就是线程锁的应用… … 

python的多线程的同步与其他语言基本相同,主要包含:

Lock & RLock :用来确保多线程多共享资源的访问。

Semaphore :简单理解可以理解Lock互斥锁的加强版,他一个锁,可以控制多个thread的访问。。      Lock的话,只能是让一个线程来访问,Semaphore可以控制数目… 

Event : event线程间通信的方式,一个线程可以发送信号,其他的线程接收到信号后执行操作。 

Condition : 虽然他有wait notify这样的机制,实现的效果其实跟Event差不多

咱们先说下Lock和Rlock 

下面是锁定和释放的过程
 
请求锁定 — 进入锁定池等待 — 获取锁 — 已锁定 — 释放锁
 
 
锁 Lock()
 
Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
构造方法: 
Lock()
实例方法: 
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。 
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
 

        
# from xiaorui.cc

import threading
lock = threading.Lock()
if mutex.acquire():
    counter += 1
    print "I am %s, set counter:%s" % (self.name, counter)
    mutex.release()
 

重入锁 RLock()

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
构造方法: 
RLock()
实例方法: 

acquire([timeout])/release(): 跟Lock差不多。


Semaphore 信号量对象

信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。

Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;

计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。 

直接上代码,我们把semaphore控制为3,也就是说,同时有3个线程可以用这个锁,剩下的线程也之只能是阻塞等待了…

#coding:utf-8
#blog xiaorui.cc
import time
import threading

semaphore = threading.Semaphore(3)

def func():
    if semaphore.acquire():
        for i in range(3):
            time.sleep(1)
            print (threading.currentThread().getName() + '获取锁')
        semaphore.release()
        print (threading.currentThread().getName() + ' 释放锁')


for i in range(5):
  t1 = threading.Thread(target=func)
  t1.start()

Event事件

Event内部包含了一个标志位,初始的时候为false。
可以使用使用set()来将其设置为true;
或者使用clear()将其从新设置为false;
可以使用is_set()来检查标志位的状态;
另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。

 
实例: (线程间相互通信)

import threading
import time
 
class MyThread(threading.Thread):
    def __init__(self, signal):
        threading.Thread.__init__(self)
        self.singal = signal
 
    def run(self):
        print "I am %s,I will sleep ..."%self.name
        self.singal.wait()
        print "I am %s, I awake..." %self.name
 
if __name__ == "__main__":
    singal = threading.Event()
    for t in range(0, 3):
        thread = MyThread(singal)
        thread.start()
 
    print "main thread sleep 3 seconds... "
    time.sleep(3)
 
    singal.set()

threading.Condition
可以把Condition理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):

Condition.wait([timeout]):  
wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。

Condition.notify():
唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。

Condition.notify_all() 
Condition.notifyAll()
唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

对于Condition有个例子,大家可以观摩下。

from threading import Thread, Condition
import time
import random

queue = []
MAX_NUM = 10
condition = Condition()

class ProducerThread(Thread):
    def run(self):
        nums = range(5)
        global queue
        while True:
            condition.acquire()
            if len(queue) == MAX_NUM:
                print "Queue full, producer is waiting"
                condition.wait()
                print "Space in queue, Consumer notified the producer"
            num = random.choice(nums)
            queue.append(num)
            print "Produced", num
            condition.notify()
            condition.release()
            time.sleep(random.random())


class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            condition.acquire()
            if not queue:
                print "Nothing in queue, consumer is waiting"
                condition.wait()
                print "Producer added something to queue and notified the consumer"
            num = queue.pop(0)
            print "Consumed", num
            condition.notify()
            condition.release()
            time.sleep(random.random())


ProducerThread().start()
ConsumerThread().start()




大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注