1.锁
lock.acquire()# 上锁
lock.release()# 解锁#同一时间允许一个进程上一把锁 就是Lock
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲速度却保证了数据安全。#同一时间允许多个进程上多把锁 就是[信号量Semaphore] 信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁# 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性
# 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对."""互斥锁lock:互斥锁就是进程的相互排斥谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性# 注意:多个锁一起上,不开锁,会造成死锁。上锁和解锁是一对"""from multiprocessing import Process,Lockimport time # 创建一把锁 lock = lock()# 上锁lock.acquire()print("111")lock.release()# 死锁:只上锁,不解锁,会阻塞,产生死锁lock.acquire()# lock.release()lock,release()# lock.release()lock.acquire()# lock.release()print("222")# 111# 222
例子:
"""比如12306的抢票软件,实际上是把mysql数据库中的内容读出来先与存在redis的内容中,比如内存的读写的速度极快,不需要再刻意的加延迟等待,既可以把该有的数据更新或者返回。等待抢票的高峰期过了,在慢慢把内存中的数据更新会Mysql当中"""
# (1) 模拟抢票软件import json# 读取票数,更新票数def wr_info(sign,dic=None): if sign == "r": with open("ticket",mode="r",encoding="utf-8") as fp: dic = json.load(fp) return dic elif sign == "w": with open("ticket",mode="w",encoding="utf-8") as fp: json.dump(dic,fp) # 抢票方法def get_ticket(person): # 读取数据库的实际票数 dic = wr_info("r") # with open("ticket",mode="r",encoding="utf-8") as fp: # dic = json.load(fp) time.sleep(0.1) if dic["count"] > 0: print("%s抢到票了" % (person) ) dic["count"] -= 1 # 更新数据库 wr_info("w",dic) # with open("ticket",mode="w",encoding="utf-8") as fp: # json.dump(dic,fp) else: print("%s没有买到这个票" % (person))# 用ticket来进行函数的统一调用def ticket(person,lock): # 查询票数 dic = wr_info("r") # with open("ticket",mode="r",encoding="utf-8") as fp: # dic = json.load(fp) print("%s 查询余票 : %s" % (person,dic["count"])) lock.acquire() # 开始抢票 get_ticket(person) lock.release()if __name__ == "__main__": lock = Lock() for i in range(10): p = Process(target=ticket,args=("person%s" % (i),lock) ) p.start()## 思路:1.写一个查票和更新票的方法 def wr_info(sign,dic=None)# 就行对数据库的操作,本例这个地方对文件就行操作,查"r"and "w".如果查询到数据之后会把数据返回回来# 由于操作的是字典会使用json# 2.写一个抢票的方法 def get_ticket(person)# 先查询有没有票,如果有票就行操作字典中的数据,dic['count'] -= 1# 把数据更新到数据库,也就是调用def wr_info("w",dic)# 如果没有票的话就行打印消息提示用户# 3、写一个ticket总体就行调用 def ticket(proson,lock)# # 1.查询票数 2.开始抢票 使用lock.acquire() 和lock.acquire()就行对抢票方法中的结果就行封锁,谁先占用资源谁先用# 4.在总程序下方写一个接口程序,就行多进程并发,抢票# 注意Process(target=ticcket,args=("person%s" %(i),lock)# 其中的args是传入并发进程执行的方法中的参数,注意是元祖格式# ticket文件中的数据是 {'count':1}# person0 查询余票 : 1# person1 查询余票 : 1# person2 查询余票 : 1# person4 查询余票 : 1# person0抢到票了# person3 查询余票 : 0# person1没有买到这个票# person2没有买到这个票# person5 查询余票 : 0# person8 查询余票 : 0# person7 查询余票 : 0# person4没有买到这个票# person6 查询余票 : 0# person9 查询余票 : 0# person3没有买到这个票# person5没有买到这个票# person8没有买到这个票# person7没有买到这个票# person6没有买到这个票# person9没有买到这个票
# (2)区分同步和异步# 在产生进程对象的时候,进程之间是异步 上锁之后 进程之间变成同步
def func(num,lock): lock.acquire() print("走到上锁这个地方,变成一个同步程序,先来的进程先执行,后来的进程后执行,按次序依次执行") print(num) lock.release() if __name__ == "__main__": # lock互斥锁 , 进程之间数据不共享, # 但是lock对象底层是通过socket来互相发送数据,不管多少个进程,都是同一个lock锁. lock = Lock() for i in range(10): p = Process(target=func,args=(i,lock)) # 1.10个子进程异步执行,是并发操作 p.start()# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 1# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 0# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 2# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 3# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 5# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 4# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 6# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 8# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 9# 走到上锁这个地方,变成一个同步程序,先来的程序先执行,后来的进程后执行,按照次序依次执行# 7
2.信号量
基本概念:
#同一时间允许多个进程上多把锁 就是[信号量Semaphore] 信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁 # 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性# 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.
### 信号量 Semaphore 本质上就是锁 支部所可以控制锁的数量
from multiprocessing import Process,Semaphoreimport random,timedef ktv(person,sem): # 上锁 sem.acquire() print("%s进入ktv唱歌"%person) time.sleep(random.randrange(3,8)) print("%s走出ktv离开"%(person)) # 解锁 sem.release()if __name__ == '__main__': # 同一时间最多不允许4个进程执行ktv任务,剩下的进程等待 sem = Semaphore(4) for i in range(10): p = Process(target=ktv,args=("person%s"(i),sem)) p.start()# person1进入ktv唱歌# person0进入ktv唱歌# person2进入ktv唱歌# person4进入ktv唱歌# person1走出ktv离开# person0走出ktv离开# person6进入ktv唱歌# person3进入ktv唱歌# person2走出ktv离开# person8进入ktv唱歌# person4走出ktv离开# person7进入ktv唱歌# person3走出ktv离开# person9进入ktv唱歌# person8走出ktv离开# person5进入ktv唱歌# person7走出ktv离开# person9走出ktv离开# person6走出ktv离开# person5走出ktv离开 # 小结:一次性最多只有4个人在ktv唱歌
3.事件
# 阻塞事件 :
e = Event()生成事件对象e e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False] # 如果是True 不加阻塞 # 如果是False 加阻塞# 控制这个属性的值
# set()方法 将这个属性的值改成True # clear()方法 将这个属性的值改成False # is_set()方法 判断当前的属性是否为True (默认上来是False)e = Event()# 获取该对象中的成员属性值是True还是Flaseprint(e.is_set()) # Falseprint(1) # 1# 用e.set() 把该对象的成员值变成Truee.set() # Trueprint(e.is_set()) # True(定义)e.wait() # 动态的给程序增加堵塞,当前的e.is_set()是True故是非堵塞(实现)print(2) # 2e.clear() # 将e.is_set()的属性该为Falseprint(3)e.wait() # 动态的给程序加堵塞,此时的e.is_set是Fasle,那么后面的程序就不会执行了print(4)print(5)# False# 1# True# 2# 3
例子:
# ### 模拟红绿灯效果 def traffic_light(e): print("红灯亮") while True: if e.is_set(): # 为当前绿灯等待1秒 time.sleep(1) # 切换成红灯 print("红灯亮") e.clear() else: # 为当前红灯等待1秒 time.sleep(1) # 等完1秒之后,变成绿灯 print("绿灯亮") e.set()# 小车在遇到红灯时婷, 绿灯时行.def car(e, i): if not e.is_set(): print("car %s 在等待" % (i)) # wait 获取的值是False , 所以阻塞 e.wait() print("car %s 通行了" % (i))# 车跑完了,但是红绿灯仍然在执行.if __name__ == "__main__": e = Event() # 启动交通灯 p1 = Process(target=traffic_light, args=(e,)) p1.start() # 开始跑车 for i in range(20): time.sleep(random.randrange(0, 2)) p2 = Process(target=car, args=(e, i)) p2.start()# 红灯亮# 绿灯亮# car 0 通行了# 红灯亮# car 1 在等待# car 2 在等待# car 3 在等待# car 4 在等待# 绿灯亮# car 3 通行了# car 2 通行了# car 1 通行了# car 4 通行了# car 5 通行了# 红灯亮# car 6 在等待# 绿灯亮# car 6 通行了# car 7 通行了# 红灯亮# car 8 在等待# car 9 在等待# 绿灯亮# car 8 通行了# car 9 通行了# car 10 通行了# car 11 通行了# 红灯亮# car 12 在等待# car 13 在等待# 绿灯亮# car 13 通行了# car 12 通行了# car 14 通行了# 红灯亮# car 15 在等待# car 16 在等待# 绿灯亮# car 16 通行了# car 15 通行了# car 17 通行了# 红灯亮# car 18 在等待# car 19 在等待# 绿灯亮# car 19 通行了# car 18 通行了# 红灯亮# 绿灯亮# 红灯亮# 绿灯亮 ..... 优化版:
# ### 优化版:"""# 车跑完了,红绿灯这个进程也结束掉.if __name__ == "__main__": e = Event() lst = [] # 启动交通灯 p1 = Process(target=traffic_light,args = (e,)) p1.daemon = True p1.start() for i in range(20): time.sleep(random.randrange(0,2)) p2 = Process(target=car,args=(e,i)) p2.start() lst.append(p2) # 循环等待每一辆车通过红绿灯 for i in lst: i.join() print("程序彻底执行结束")"""
4.queue队列
# IPC Inter-Process Communication
# 实现进程之间通信的两种机制:# 管道 Pipe# 队列 Queue# put() 存放# get() 获取# get_nowait() 拿不到报异常# put_nowait() 非阻塞版本的putq.empty() 检测是否为空 (了解)q.full() 检测是否已经存满 (了解)import queuefrom multiprocessing import Process,Queue# (1)基本语法"""先进先出"""q = Queue()# 1.把数据放到q队列中 putq.put(111)# 2.把数据从队列里面拿出来 get# res = q.get()# print(res)# 3.当队列里面的值都拿出来,已经没有数据的时候,在获取会阻塞.# res = q.get() 直接阻塞# 4.get_nowait() 无论有没有都拿,如果拿不到,直接报错# get_nowait 内部要依赖queue模块来实现# 没有完全优化好,不推荐使用,想用就用put和get分别设置和获取.就可以了"""# res = q.get_nowait() 不推荐使用# print(res)# try .. except .. 捕捉异常"""try: print(q.get_nowait())# 特指queue.Empty 这种错误,执行某些逻辑.except queue.Empty: print("empty")except: print("其他")
# (2) 可以使用queue 指定队列的长度
"""最多放3个,超过最大长度,就执行阻塞"""q = Queue(3)q.put(1)q.put(2)q.put(3)# print(q.get())# q.put(4) 阻塞的情况# q.put_nowait(4) 超过队列最大长度,在存值直接报错 (不推荐使用)# (了解 不常用 full empty)"""# 队列值满了返回真,不满返回假res = q.full()print(res)# 判断队列中是否为空res = q.empty()print(res)"""
# (3) 进程之间的通讯def func(q): # 1.主进程添加的值,子进程可以通过队列拿到. res = q.get() print("我是子进程", res) q.put("a2222")if __name__ == "__main__": q1 = Queue() p = Process(target=func, args=(q1,)) p.start() q1.get() # 主进程添加数据 q1.put("a111") p.join() # 2.子进程添加的值,主进程通过队列可以拿到 print("主进程:", q1.get())
5.生产者与消费者
"""
# 爬虫例子:1号进程负责获取网页中所有内容2号进程负责匹配提取网页中的关键字1号进程就可以看成一个生产者
2号进程就可以看成一个消费者有时可能生产者比消费者快,反之也是一样
所以生产者和消费者为了弥补之间速度的差异化,加了一个中间的缓冲队列.生产者和消费者模型从程序上看就是一个存放和拿取的过程.
最为理想的生产设消费者模型是两者之间的运行速度相对同步."""
from multiprocessing import Process,Queueimport random,time# 消费者模型 [负责取值]def consumer(q,name): while True: food = q.get() if food is None: break time.sleep(random.uniform(0.1,1)) print("1. %s 吃了一个%s" % (name,food)) # 生产者模型 [负责存值]def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) print("2. %s 生产了 %s %s" % (name,food,i)) q.put(food+str(i)) if __name__ == "__main__": q = Queue() # 创建消费者进程对象 # 1号消费者 c1 = Process(target=consumer,args=(q,"马俊强")) # 如果设置守护进程,主进程结束,当前消费者模型结束,不能够保证所有数据消费完毕.是一个弊端,并不理想. # c1.daemon = True c1.start() # 2号消费者 c2 = Process(target=consumer,args=(q,"境泽年")) c2.start() # 创建生产者进程对象 # 1号生产者 p1 = Process(target=producer,args=(q,"张国成","方便面")) p1.start() # 2号生产者 p2 = Process(target=producer,args=(q,"易思","大力丸")) p2.start() # 等待2个生产者进程执行完毕.在向下执行. p1.join() p2.join() """ # 四个进程,2个生产者,2个消费者,添加一个None可以让其中一个消费者终止 还有一个消费者,需要通过另一个生产者添加一个None值,来让第二个消费者终止 所以添加两个None """ q.put(None) q.put(None) 运行结果:
2. 张国成 生产了 方便面 0
2. 易思 生产了 大力丸 01. 马俊强 吃了一个方便面02. 张国成 生产了 方便面 11. 境泽年 吃了一个大力丸02. 易思 生产了 大力丸 11. 马俊强 吃了一个方便面11. 境泽年 吃了一个大力丸12. 张国成 生产了 方便面 22. 张国成 生产了 方便面 32. 易思 生产了 大力丸 22. 易思 生产了 大力丸 31. 境泽年 吃了一个方便面31. 境泽年 吃了一个大力丸21. 马俊强 吃了一个方便面22. 张国成 生产了 方便面 42. 易思 生产了 大力丸 41. 境泽年 吃了一个大力丸31. 马俊强 吃了一个方便面41. 境泽年 吃了一个大力丸4
6.joinablequeue
"""put 存入get 获取task_done 与 join 通过一个中间变量统计队列元素个数每放入一个值,队列元素个数加1通过task_done,让当前队列的元素数量减1最后join查找统计队列数的这个变量,如果是0,才不会添加阻塞,放行.join判断如果队列里面还有值,默认是要加阻塞的.用法:每次get一个数据,就执行一次 task_done(),可以让中间变量的值减1"""
# (1) 基本用法jq = JoinableQueue()jq.put(11)print(jq.get())jq.task_done()jq.join()print("finish")# # 11# finish
# ### JoinableQueuefrom multiprocessing import Process,JoinableQueueimport time,random# (2) 优化生产者消费者模型;# 消费者模型 [负责取值]def consumer(q, name): while True: food = q.get() time.sleep(random.uniform(0.1, 1)) print("1. %s 吃了一个%s" % (name, food)) q.task_done()# 生产者模型 [负责存值]def producer(q, name, food): for i in range(5): time.sleep(random.uniform(0.1, 1)) print("2. %s 生产了 %s %s" % (name, food, i)) q.put(food + str(i))if __name__ == "__main__": jq = JoinableQueue() # 创建消费者 c1 = Process(target=consumer, args=(jq, "罗婷")) # 伴随主进程结束而结束 c1.daemon = True c1.start() # 创建生产者 p1 = Process(target=producer, args=(jq, "张国成", "香水")) p1.start() p1.join() # 必须等队列里面的所有数据都被清空 ,判定值为0之后才放行 jq.join() print("全部结束") # 2. 张国成 生产了 香水 0 # 2. 张国成 生产了 香水 1 # 1. 罗婷 吃了一个香水0 # 1. 罗婷 吃了一个香水1 # 2. 张国成 生产了 香水 2 # 2. 张国成 生产了 香水 3 # 1. 罗婷 吃了一个香水2 # 2. 张国成 生产了 香水 4 # 1. 罗婷 吃了一个香水3 # 1. 罗婷 吃了一个香水4 # 全部结束
# 1.把c1创建为守护进程之后 如果是下面程序中,由于是守护进程,(目的是让#消费者这行代码执行完之后推出循环)一旦总程序执行到最后一句的时候程序就会# 停止,会出现 生产了很多,还没有消费完的情况#2.基于第一种情况下 我们需要在末尾边上再次像序列中添加一个值即是程序中的q.put() =None#这样就可以推出循环了,不要使用守护进程的方式。# 3.但是在上面的做法中由于jq.join()方式,如果是0 才不会添加堵塞放行,所以这种方式就可以使用守护# 进程from multiprocessing import Process, Queueimport random, time# 消费者模型 [负责取值]def consumer(q, name): while True: food = q.get() if food is None: break time.sleep(random.uniform(0.1, 1)) print("1. %s 吃了一个%s" % (name, food))# 生产者模型 [负责存值]def producer(q, name, food): for i in range(5): time.sleep(random.uniform(0.1, 1)) print("2. %s 生产了 %s %s" % (name, food, i)) q.put(food + str(i))if __name__ == "__main__": q = Queue() # 创建消费者进程对象 # 1号消费者 c1 = Process(target=consumer, args=(q, "马俊强")) # 如果设置守护进程,主进程结束,当前消费者模型结束,不能够保证所有数据消费完毕.是一个弊端,并不理想. c1.daemon = True c1.start() # 创建生产者进程对象 # 1号生产者 p1 = Process(target=producer, args=(q, "张国成", "方便面")) p1.start() # 等待2个生产者进程执行完毕.在向下执行. p1.join() """ # 四个进程,2个生产者,2个消费者,添加一个None可以让其中一个消费者终止 还有一个消费者,需要通过另一个生产者添加一个None值,来让第二个消费者终止 所以添加两个None """ # q.put(None)# 2. 张国成 生产了 方便面 0# 2. 张国成 生产了 方便面 1# 1. 马俊强 吃了一个方便面0# 2. 张国成 生产了 方便面 2# 2. 张国成 生产了 方便面 3# 1. 马俊强 吃了一个方便面1# 1. 马俊强 吃了一个方便面2# 2. 张国成 生产了 方便面 4