博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Day30 python 锁、信号量、事件、queue队列、生产者与消费者、joinablequeue
阅读量:4593 次
发布时间:2019-06-09

本文共 12468 字,大约阅读时间需要 41 分钟。

1.锁

lock.acquire()# 上锁

lock.release()# 解锁

#同一时间允许一个进程上一把锁 就是Lock

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲速度却保证了数据安全。
#同一时间允许多个进程上多把锁 就是[信号量Semaphore]
信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁

# 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性

# 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.

"""互斥锁lock:互斥锁就是进程的相互排斥谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性# 注意:多个锁一起上,不开锁,会造成死锁。上锁和解锁是一对"""from multiprocessing import Process,Lockimport time # 创建一把锁 lock = lock()# 上锁lock.acquire()print("111")lock.release()# 死锁:只上锁,不解锁,会阻塞,产生死锁lock.acquire()# lock.release()lock,release()# lock.release()lock.acquire()# lock.release()print("222")# 111# 222

例子:

"""比如12306的抢票软件,实际上是把mysql数据库中的内容读出来先与存在redis的内容中,比如内存的读写的速度极快,不需要再刻意的加延迟等待,既可以把该有的数据更新或者返回。等待抢票的高峰期过了,在慢慢把内存中的数据更新会Mysql当中"""
# (1) 模拟抢票软件import json# 读取票数,更新票数def wr_info(sign,dic=None):    if sign == "r":        with open("ticket",mode="r",encoding="utf-8") as fp:            dic = json.load(fp)        return dic    elif sign == "w":        with open("ticket",mode="w",encoding="utf-8") as fp:            json.dump(dic,fp)            # 抢票方法def get_ticket(person):    # 读取数据库的实际票数    dic = wr_info("r")    # with open("ticket",mode="r",encoding="utf-8") as fp:        # dic = json.load(fp)    time.sleep(0.1)    if dic["count"] > 0:        print("%s抢到票了" % (person) )        dic["count"] -= 1        # 更新数据库        wr_info("w",dic)        # with open("ticket",mode="w",encoding="utf-8") as fp:            # json.dump(dic,fp)    else:        print("%s没有买到这个票" % (person))# 用ticket来进行函数的统一调用def ticket(person,lock):    # 查询票数    dic = wr_info("r")    # with open("ticket",mode="r",encoding="utf-8") as fp:        # dic = json.load(fp)    print("%s 查询余票 : %s" % (person,dic["count"]))    lock.acquire()        # 开始抢票    get_ticket(person)    lock.release()if __name__ == "__main__":    lock = Lock()    for i in range(10):        p = Process(target=ticket,args=("person%s" % (i),lock) )        p.start()## 思路:1.写一个查票和更新票的方法 def wr_info(sign,dic=None)# 就行对数据库的操作,本例这个地方对文件就行操作,查"r"and "w".如果查询到数据之后会把数据返回回来# 由于操作的是字典会使用json#  2.写一个抢票的方法 def get_ticket(person)# 先查询有没有票,如果有票就行操作字典中的数据,dic['count'] -= 1# 把数据更新到数据库,也就是调用def wr_info("w",dic)# 如果没有票的话就行打印消息提示用户# 3、写一个ticket总体就行调用 def ticket(proson,lock)# # 1.查询票数 2.开始抢票 使用lock.acquire() 和lock.acquire()就行对抢票方法中的结果就行封锁,谁先占用资源谁先用# 4.在总程序下方写一个接口程序,就行多进程并发,抢票# 注意Process(target=ticcket,args=("person%s" %(i),lock)# 其中的args是传入并发进程执行的方法中的参数,注意是元祖格式# ticket文件中的数据是 {'count':1}# person0 查询余票 : 1# person1 查询余票 : 1# person2 查询余票 : 1# person4 查询余票 : 1# person0抢到票了# person3 查询余票 : 0# person1没有买到这个票# person2没有买到这个票# person5 查询余票 : 0# person8 查询余票 : 0# person7 查询余票 : 0# person4没有买到这个票# person6 查询余票 : 0# person9 查询余票 : 0# person3没有买到这个票# person5没有买到这个票# person8没有买到这个票# person7没有买到这个票# person6没有买到这个票# person9没有买到这个票
# (2)区分同步和异步# 在产生进程对象的时候,进程之间是异步 上锁之后 进程之间变成同步
def func(num,lock):    lock.acquire()    print("走到上锁这个地方,变成一个同步程序,先来的进程先执行,后来的进程后执行,按次序依次执行")    print(num)    lock.release()    if __name__ == "__main__":    # lock互斥锁 , 进程之间数据不共享,    # 但是lock对象底层是通过socket来互相发送数据,不管多少个进程,都是同一个lock锁.    lock = Lock()    for i in range(10):        p = Process(target=func,args=(i,lock))        # 1.10个子进程异步执行,是并发操作        p.start()# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 1# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 0# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 2# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 3# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 5# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 4# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 6# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 8# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 9# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 7

 

2.信号量

基本概念:

#同一时间允许多个进程上多把锁 就是[信号量Semaphore]    信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁    # 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性# 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.

 

### 信号量 Semaphore 本质上就是锁 支部所可以控制锁的数量

from multiprocessing import Process,Semaphoreimport random,timedef ktv(person,sem):    # 上锁    sem.acquire()    print("%s进入ktv唱歌"%person)    time.sleep(random.randrange(3,8))    print("%s走出ktv离开"%(person))    # 解锁    sem.release()if __name__ == '__main__':    # 同一时间最多不允许4个进程执行ktv任务,剩下的进程等待    sem = Semaphore(4)    for i in range(10):        p = Process(target=ktv,args=("person%s"(i),sem))        p.start()# person1进入ktv唱歌# person0进入ktv唱歌# person2进入ktv唱歌# person4进入ktv唱歌# person1走出ktv离开# person0走出ktv离开# person6进入ktv唱歌# person3进入ktv唱歌# person2走出ktv离开# person8进入ktv唱歌# person4走出ktv离开# person7进入ktv唱歌# person3走出ktv离开# person9进入ktv唱歌# person8走出ktv离开# person5进入ktv唱歌# person7走出ktv离开# person9走出ktv离开# person6走出ktv离开# person5走出ktv离开 # 小结:一次性最多只有4个人在ktv唱歌

 

3.事件

# 阻塞事件 :

e = Event()生成事件对象e
e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
# 如果是True 不加阻塞
# 如果是False 加阻塞

# 控制这个属性的值

# set()方法 将这个属性的值改成True
# clear()方法 将这个属性的值改成False
# is_set()方法 判断当前的属性是否为True (默认上来是False)

e = Event()# 获取该对象中的成员属性值是True还是Flaseprint(e.is_set())  # Falseprint(1)           # 1# 用e.set() 把该对象的成员值变成Truee.set() # Trueprint(e.is_set()) # True(定义)e.wait() # 动态的给程序增加堵塞,当前的e.is_set()是True故是非堵塞(实现)print(2) # 2e.clear() # 将e.is_set()的属性该为Falseprint(3)e.wait() # 动态的给程序加堵塞,此时的e.is_set是Fasle,那么后面的程序就不会执行了print(4)print(5)# False# 1# True# 2# 3

 

例子:

# ### 模拟红绿灯效果 def traffic_light(e):    print("红灯亮")    while True:        if e.is_set():            # 为当前绿灯等待1秒            time.sleep(1)            # 切换成红灯            print("红灯亮")            e.clear()        else:            # 为当前红灯等待1秒            time.sleep(1)            # 等完1秒之后,变成绿灯            print("绿灯亮")            e.set()# 小车在遇到红灯时婷, 绿灯时行.def car(e, i):    if not e.is_set():        print("car %s 在等待" % (i))        # wait 获取的值是False , 所以阻塞        e.wait()    print("car %s 通行了" % (i))# 车跑完了,但是红绿灯仍然在执行.if __name__ == "__main__":    e = Event()    # 启动交通灯    p1 = Process(target=traffic_light, args=(e,))    p1.start()    # 开始跑车    for i in range(20):        time.sleep(random.randrange(0, 2))        p2 = Process(target=car, args=(e, i))        p2.start()# 红灯亮# 绿灯亮# car 0 通行了# 红灯亮# car 1 在等待# car 2 在等待# car 3 在等待# car 4 在等待# 绿灯亮# car 3 通行了# car 2 通行了# car 1 通行了# car 4 通行了# car 5 通行了# 红灯亮# car 6 在等待# 绿灯亮# car 6 通行了# car 7 通行了# 红灯亮# car 8 在等待# car 9 在等待# 绿灯亮# car 8 通行了# car 9 通行了# car 10 通行了# car 11 通行了# 红灯亮# car 12 在等待# car 13 在等待# 绿灯亮# car 13 通行了# car 12 通行了# car 14 通行了# 红灯亮# car 15 在等待# car 16 在等待# 绿灯亮# car 16 通行了# car 15 通行了# car 17 通行了# 红灯亮# car 18 在等待# car 19 在等待# 绿灯亮# car 19 通行了# car 18 通行了# 红灯亮# 绿灯亮# 红灯亮# 绿灯亮   ..... 优化版:
# ### 优化版:"""# 车跑完了,红绿灯这个进程也结束掉.if __name__ == "__main__":    e = Event()    lst = []            # 启动交通灯    p1 = Process(target=traffic_light,args = (e,))    p1.daemon = True    p1.start()        for i in range(20):        time.sleep(random.randrange(0,2))        p2 = Process(target=car,args=(e,i))        p2.start()        lst.append(p2)        # 循环等待每一辆车通过红绿灯    for i in lst:        i.join()            print("程序彻底执行结束")"""
 

4.queue队列

# IPC Inter-Process Communication

# 实现进程之间通信的两种机制:
# 管道 Pipe
# 队列 Queue
# put() 存放
# get() 获取
# get_nowait() 拿不到报异常
# put_nowait() 非阻塞版本的put
q.empty() 检测是否为空 (了解)
q.full() 检测是否已经存满 (了解)

import queuefrom multiprocessing import Process,Queue# (1)基本语法"""先进先出"""q = Queue()# 1.把数据放到q队列中 putq.put(111)# 2.把数据从队列里面拿出来 get# res = q.get()# print(res)# 3.当队列里面的值都拿出来,已经没有数据的时候,在获取会阻塞.# res = q.get() 直接阻塞# 4.get_nowait() 无论有没有都拿,如果拿不到,直接报错# get_nowait 内部要依赖queue模块来实现# 没有完全优化好,不推荐使用,想用就用put和get分别设置和获取.就可以了"""# res = q.get_nowait()  不推荐使用# print(res)# try .. except .. 捕捉异常"""try:    print(q.get_nowait())# 特指queue.Empty 这种错误,执行某些逻辑.except queue.Empty:    print("empty")except:    print("其他")

 

# (2) 可以使用queue 指定队列的长度

"""最多放3个,超过最大长度,就执行阻塞"""q = Queue(3)q.put(1)q.put(2)q.put(3)# print(q.get())# q.put(4) 阻塞的情况# q.put_nowait(4) 超过队列最大长度,在存值直接报错 (不推荐使用)# (了解 不常用 full empty)"""# 队列值满了返回真,不满返回假res = q.full()print(res)# 判断队列中是否为空res = q.empty()print(res)"""
# (3) 进程之间的通讯def func(q):    # 1.主进程添加的值,子进程可以通过队列拿到.    res = q.get()    print("我是子进程", res)    q.put("a2222")if __name__ == "__main__":    q1 = Queue()    p = Process(target=func, args=(q1,))    p.start()    q1.get()    # 主进程添加数据    q1.put("a111")    p.join()    # 2.子进程添加的值,主进程通过队列可以拿到    print("主进程:", q1.get()) 

 

5.生产者与消费者

 

"""

# 爬虫例子:
1号进程负责获取网页中所有内容
2号进程负责匹配提取网页中的关键字

1号进程就可以看成一个生产者

2号进程就可以看成一个消费者

有时可能生产者比消费者快,反之也是一样

所以生产者和消费者为了弥补之间速度的差异化,加了一个中间的缓冲队列.

生产者和消费者模型从程序上看就是一个存放和拿取的过程.

最为理想的生产设消费者模型是两者之间的运行速度相对同步.
"""

 

from multiprocessing import Process,Queueimport random,time# 消费者模型 [负责取值]def consumer(q,name):    while True:        food = q.get()        if food is None:            break        time.sleep(random.uniform(0.1,1))        print("1. %s 吃了一个%s" % (name,food))    # 生产者模型 [负责存值]def producer(q,name,food):    for i in range(5):        time.sleep(random.uniform(0.1,1))        print("2. %s 生产了 %s %s" % (name,food,i))        q.put(food+str(i))    if __name__ == "__main__":    q = Queue()    # 创建消费者进程对象    # 1号消费者    c1 = Process(target=consumer,args=(q,"马俊强"))    # 如果设置守护进程,主进程结束,当前消费者模型结束,不能够保证所有数据消费完毕.是一个弊端,并不理想.    # c1.daemon = True    c1.start()    # 2号消费者    c2 = Process(target=consumer,args=(q,"境泽年"))    c2.start()                # 创建生产者进程对象    # 1号生产者    p1 = Process(target=producer,args=(q,"张国成","方便面"))    p1.start()    # 2号生产者    p2 = Process(target=producer,args=(q,"易思","大力丸"))    p2.start()        # 等待2个生产者进程执行完毕.在向下执行.    p1.join()    p2.join()    """    # 四个进程,2个生产者,2个消费者,添加一个None可以让其中一个消费者终止    还有一个消费者,需要通过另一个生产者添加一个None值,来让第二个消费者终止    所以添加两个None    """    q.put(None)    q.put(None) 运行结果:

2. 张国成 生产了 方便面 0

2. 易思 生产了 大力丸 0
1. 马俊强 吃了一个方便面0
2. 张国成 生产了 方便面 1
1. 境泽年 吃了一个大力丸0
2. 易思 生产了 大力丸 1
1. 马俊强 吃了一个方便面1
1. 境泽年 吃了一个大力丸1
2. 张国成 生产了 方便面 2
2. 张国成 生产了 方便面 3
2. 易思 生产了 大力丸 2
2. 易思 生产了 大力丸 3
1. 境泽年 吃了一个方便面3
1. 境泽年 吃了一个大力丸2
1. 马俊强 吃了一个方便面2
2. 张国成 生产了 方便面 4
2. 易思 生产了 大力丸 4
1. 境泽年 吃了一个大力丸3
1. 马俊强 吃了一个方便面4
1. 境泽年 吃了一个大力丸4

 

 

6.joinablequeue

 

"""put 存入get 获取task_done 与 join  通过一个中间变量统计队列元素个数每放入一个值,队列元素个数加1通过task_done,让当前队列的元素数量减1最后join查找统计队列数的这个变量,如果是0,才不会添加阻塞,放行.join判断如果队列里面还有值,默认是要加阻塞的.用法:每次get一个数据,就执行一次 task_done(),可以让中间变量的值减1"""
# (1) 基本用法jq = JoinableQueue()jq.put(11)print(jq.get())jq.task_done()jq.join()print("finish")# # 11# finish

 

# ### JoinableQueuefrom multiprocessing import Process,JoinableQueueimport time,random# (2) 优化生产者消费者模型;# 消费者模型 [负责取值]def consumer(q, name):    while True:        food = q.get()        time.sleep(random.uniform(0.1, 1))        print("1. %s 吃了一个%s" % (name, food))        q.task_done()# 生产者模型 [负责存值]def producer(q, name, food):    for i in range(5):        time.sleep(random.uniform(0.1, 1))        print("2. %s 生产了 %s %s" % (name, food, i))        q.put(food + str(i))if __name__ == "__main__":    jq = JoinableQueue()    # 创建消费者    c1 = Process(target=consumer, args=(jq, "罗婷"))    # 伴随主进程结束而结束    c1.daemon = True    c1.start()    # 创建生产者    p1 = Process(target=producer, args=(jq, "张国成", "香水"))    p1.start()    p1.join()    # 必须等队列里面的所有数据都被清空 ,判定值为0之后才放行    jq.join()    print("全部结束")    # 2. 张国成 生产了 香水 0    # 2. 张国成 生产了 香水 1    # 1. 罗婷 吃了一个香水0    # 1. 罗婷 吃了一个香水1    # 2. 张国成 生产了 香水 2    # 2. 张国成 生产了 香水 3    # 1. 罗婷 吃了一个香水2    # 2. 张国成 生产了 香水 4    # 1. 罗婷 吃了一个香水3    # 1. 罗婷 吃了一个香水4    # 全部结束

 

# 1.把c1创建为守护进程之后 如果是下面程序中,由于是守护进程,(目的是让#消费者这行代码执行完之后推出循环)一旦总程序执行到最后一句的时候程序就会# 停止,会出现 生产了很多,还没有消费完的情况#2.基于第一种情况下 我们需要在末尾边上再次像序列中添加一个值即是程序中的q.put() =None#这样就可以推出循环了,不要使用守护进程的方式。# 3.但是在上面的做法中由于jq.join()方式,如果是0 才不会添加堵塞放行,所以这种方式就可以使用守护# 进程from multiprocessing import Process, Queueimport random, time# 消费者模型 [负责取值]def consumer(q, name):    while True:        food = q.get()        if food is None:            break        time.sleep(random.uniform(0.1, 1))        print("1. %s 吃了一个%s" % (name, food))# 生产者模型 [负责存值]def producer(q, name, food):    for i in range(5):        time.sleep(random.uniform(0.1, 1))        print("2. %s 生产了 %s %s" % (name, food, i))        q.put(food + str(i))if __name__ == "__main__":    q = Queue()    # 创建消费者进程对象    # 1号消费者    c1 = Process(target=consumer, args=(q, "马俊强"))    # 如果设置守护进程,主进程结束,当前消费者模型结束,不能够保证所有数据消费完毕.是一个弊端,并不理想.    c1.daemon = True    c1.start()    # 创建生产者进程对象    # 1号生产者    p1 = Process(target=producer, args=(q, "张国成", "方便面"))    p1.start()    # 等待2个生产者进程执行完毕.在向下执行.    p1.join()    """    # 四个进程,2个生产者,2个消费者,添加一个None可以让其中一个消费者终止    还有一个消费者,需要通过另一个生产者添加一个None值,来让第二个消费者终止    所以添加两个None    """    # q.put(None)# 2. 张国成 生产了 方便面 0# 2. 张国成 生产了 方便面 1# 1. 马俊强 吃了一个方便面0# 2. 张国成 生产了 方便面 2# 2. 张国成 生产了 方便面 3# 1. 马俊强 吃了一个方便面1# 1. 马俊强 吃了一个方便面2# 2. 张国成 生产了 方便面 4

 

转载于:https://www.cnblogs.com/longerandergou/p/10981935.html

你可能感兴趣的文章
linux用户组管理
查看>>
Console-算法[for]-输出等腰三角形
查看>>
随机数产生方法小知识点
查看>>
Angular2.js——表单(上)
查看>>
Programming With Objective-C---- Introduction ---- Objective-C 学习(一)
查看>>
正则表达式语法大全
查看>>
DateUtils
查看>>
pb开发的客户端,使用oracle 9i客户端 提示oci.dll could not be loaded
查看>>
wordpress调用指定post type文章怎么操作
查看>>
magento开发手册之目录结构
查看>>
换个红圈1微信头像恶搞一下好友
查看>>
javascript学习_廖大_20170218
查看>>
bzoj2038: [2009国家集训队]小Z的袜子(hose) 莫队
查看>>
火车头采集基本使用
查看>>
MYSQL中插入数据以及修改数据的部分方法
查看>>
unity中遍历动画得到动画名字和动画数量
查看>>
调整WebLogic的时间
查看>>
Linux学习笔记总结--配置iptables防火墙
查看>>
win10 安装mysql
查看>>
SQL文 Update From 写法
查看>>