提交任务的两种方式
对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
同步:提交完任务后就在原地等待,直到任务运行完毕后拿到任务的返回值,再继续运行下一行代码
异步:提交完任务后不在原地等待,直接运行下一行代码,不关心任务的执行过程,任务的返回值不管~~
异步效率高于同步 但是并不是所有任务都可以异步执行,判断一个任务是否可以异步的条件是,任务发起方是否立即需要执行结果
同步不等于阻塞 异步不等于非阻塞 当使用异步方式发起任务时 任务中可能包含io操作 异步也可能阻塞 同步提交任务 也会卡主程序 但是不等同阻塞,因为任务中可能在做一对计算任务,CPU没走
pool.submit(task,i)===>产生对象 j pool.shutdown(wait=True) # shutdown 关闭进程池入口,wait=True,等异步任务都运行完在运行下一行代码
lock、GIL 就是为了达到同步,来保证安全.
而异步在多进程/线程IO的时候能提高效率.
本质就是一把mutex(互斥锁)---限制多个并发线程同一时间只能有一个执行
即,有了GIL的存在,同一进程内的多个线程同一时刻只能有一个在运行,(意味着在Cpython中一个进程下的多个线程无法实现并行===》无法利用多核优势)避免多个线程同时对资源进行读写造成混乱
但不影响并发的实现
GIL可被比喻成执行权限,同一进程下所有线程要想执行都要先抢执行权限
Cpython解释器自带的垃圾回收管理机制不是线程安全的(不能保证多线程同时操作产生的问题)------解决了安全问题 但是降低了效率( 另,虽然有解决方案 但是由于牵涉太多,一旦修改则 很多以前的基于GIL的程序都需要修改,所以变成了历史遗留问题 )
即使在多核处理器下 也无法无法真正的并行。
so,GIL仅存在Cpython中,其他解释器无此问题,因Cpython优势在于有大量C的库可调用,-----其仍为主流
python的垃圾回收管理机制 用的是引用计数
加锁:调用解释器时立即加锁
解锁:1.当前程序遇到IO时
2.时间过长
3.有一个优先级更高的程序替代了它
程序处理关键---IO,非计算
IO密集型:需要大量IO时间
计算密集型:全是计算任务
单核下无论是IO密集型还是计算密集型GIL都不会产生任何影响
多核下,对于IO密集型任务,GIL会有细微的影响
Cpython中IO密集任务应采用多线程,计算密集型应采用多进程---Cpython中线程无法并行执行
死锁与递归锁(可重入锁)
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都
正在使用,所有这两个线程在无外力作用下将一直等待下去。更直观的死锁比如,有两个lock对象,同一资源分别被两个进程的lock.requare阻塞,就造成程序永久的阻塞
解决死锁就可以用递归锁
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:
threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
GIL使用用于保护解释器相关的数据,解释器也是一段程序,肯定有其定义各种数据 GIL并不能保证你自己定义的数据的安全,所以一旦你的程序中出现了多线程共享数据时就需要自己加锁
自定义互斥锁---------保护程序员自己的程序的资源安全(多线程时,保证修改共享数据时有序的修改,不会产生数据修改混乱) GIL ----------保护解释器资源的安全(保证同一时刻只有一个线程能使用到cpu)
GIL会在任务无法执行(无执行权限)时被强制释放,互斥锁即便无法执行也不会自动释放
线程拿到公共数据--申请GIL---调用解释器中程序--到操作系统---CPU执行--执行时间到,释放GIL---再抢
当我们使用多线程的时候,每一个进程中只有一个GIL,那么这多个线程中谁拿到GIL,谁就可以使用cpu(ps:多个进程有多个Gil,但每个进程中只有一个GIL),所以当python用cpython作为解释器的时候,多线程就不是真正意义上的多线程,属于伪并发的多线程。
线程先得到cpu使用权 GIL锁使得线程可以使用解释器,得到lock 使得线程可以执行
- 首先假设只有一个进程,这个进程中有两个线程 Thread1,Thread2, 要修改共享的数据date, 并且有互斥锁: 执行以下步骤: 多线程运行,假设Thread1获得GIL可以使用cpu,这时Thread1获得 互斥锁lock,Thread1可以改date数据(但并没有开始修改数据); Thread1线程在修改date数据前发生了 i/o操作 或者 ticks计数满100((注意就是没有运行到修改data数据),这个时候 Thread1 让出了Gil,Gil锁可以被竞争); Thread1 和 Thread2 开始竞争Gil (注意:如果Thread1是因为i/o 阻塞 让出的Gil,Thread2必定拿到Gil,如果Thread1是因为ticks计数满100让出Gil这个时候Thread1 和 Thread2 公平竞争); 假设 Thread2正好获得了GIL, 运行代码去修改共享数据date,由于Thread1有互斥锁lock,所以Thread2无法更改共享数据date,这时Thread2让出Gil锁, GIL锁再次发生竞争; 假设Thread1又抢到GIL,由于其有互斥锁Lock所以其可以继续修改共享数据data,当Thread1修改完数据释放互斥锁lock,Thread2在获得GIL与lock后才可对data进行修改
练习:
执行客户端程序,用户可选的功能有: 1、登录 2、注册 3、上传 4、下载
思路解析: 1、执行登录,输入用户名cgon,密码123,对用户名egon和密码进行hash校验,并加盐处理,将密文密码发送到服务端, 与服务端事先存好用户名与密文密码进行对比,对比成功后, 在服务端内存中用hash算法生成一个随机字符串比如eadc05b6c5dda1f8772c4f4ca64db110 然后将该字符串发送给用户以及登录成功的提示信息发送给客户端,然后在服务端存放好 current_users={ 'a3sc05b6c5dda1f8313c4f4ca64db110':{'uid':0,'username':'alex'}, 'e31adfc05b6c5dda1f8772c4f4ca64b0':{'uid':1,'username':'lxx'}, 'eadc05b6c5dda1f8772c4f4ca64db110':{'uid':2,'username':'cgon'}, } 这个不太合适吧 反过来存? 用户在收到服务端发来的'eadc05b6c5dda1f8772c4f4ca64db110'以及登录成功的提示信息后,以后的任何操作都会携带该随 机字符串'eadc05b6c5dda1f8772c4f4ca64db110‘,服务端会根据该字符串获取用户信息来进行与该用户匹配的操作 在用户关闭连接后,服务端会从current_users字典中清除用户信息,下次重新登录,会产生新的随机字符串 这样做的好处: 1、用户的敏感信息全都存放到服务端,更加安全 2、每次登录都拿到一个新的随机的字符串,不容易被伪造 2、执行注册功能,提交到服务端,然后存放到文件中,如果用户已经存在则提示用户已经注册过,要求重新输入用户信息 3、执行上次下载功能时会携带用户的随机字符串到服务端,如果服务端发现该字符串not in current_users,则要求用户先登录
参数 最大进程数,不设置就默认造与CPU核数等量的进程数
最大线程数,不设置就默认造与CPU核数*5
Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
池子大小固定,任务数固定,即 ,池的功能是限制启动的进程/线程数。当并发的任务数远超计算机承受能力时--无法一次性开启过多进程/线程数时,就应用池的概念将开启的进程/线程数限制在计算机可承受范围内
与信号量的区别 ,信号量也是一种锁 适用于保证同一时间能有多少个进程或线程访问
而线程/进程池,没有对数据访问进行限制仅仅是控制数量
线程池 from concurrent.futures import ThreadPoolExecutor pool = ThreadPoolExecutor() 进程池 from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor() pool.submit()
生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主进程结束')
程序遇到了IO操作,无法继续执行代码,叫做阻塞 程序没有遇到IO操作,正常执行中,就叫非阻塞 它们指的是程序的状态
就绪 运行 阻塞
就绪和阻塞给人的感觉就是卡住了
子线程/进程完成任务时自动调用指定的函数
回调:
A交给B一个任务,B执行完成后回头调用了A的一个函数
为什么需要回调
需要获取异步任务的结果,但是又不应该阻塞
ts,re,os from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor # respOnse= requests.get("https://www.baidu.com") # htm = response.content.decode("utf-8") # print(re.findall("href=.*?com",htm)) def get_data(url): print("%s 正在请求%s" % (os.getpid(),url)) response = requests.get(url) print("%s 请求%s成功" % (os.getpid(),url)) return response def parser(res): htm = res.content.decode("utf-8") ls = re.findall("href=.*?com", htm) print("解析完成! 共%s个连接" % len(ls)) if __name__ == '__main__': urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.tmall.com", "https://www.taobao.com", "https://www.jd.com", "https://www.python.org", "https://www.apple.com"] pool = ProcessPoolExecutor(3) objs = [] for i in urls: obj = pool.submit(get_data,i) # res = obj.result() # 会把任务变成串行 # parser(res) objs.append(obj) pool.shutdown() # 请求依然是并发,但是请求的结果不能被立即处理 for i in objs: # 解析数据时串行的 parser(i.result())
通常异步任务都会和回调函数一起使用
通过add_done_callback函数给Future对象绑定一个回调函数
注:在多进程中,回调函数是交给主进程来执行
多线程中,谁有空谁执行回调函数
# 线程池中使用异步回调 def get_data(url): print("%s 正在请求%s" % (current_thread().name, url)) response = requests.get(url) print("%s 请求%s成功" % (current_thread().name, url)) return response def parser(obj): res = obj.result() htm = res.content.decode("utf-8") ls = re.findall("href=.*?com", htm) print("%s解析完成! 共%s个连接" % (current_thread().name,len(ls))) if __name__ == '__main__': urls = ["https://www.baidu.com", "https://www.tmall.com", "https://www.taobao.com", "https://www.jd.com", "https://www.python.org", "https://www.apple.com"] pool = ThreadPoolExecutor(3) for i in urls: obj = pool.submit(get_data, i) # res = obj.result() # 会把任务变成串行 # parser(res) obj.add_done_callback(parser)
import requests, re, os from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from threading import current_thread # respOnse= requests.get("https://www.baidu.com") # htm = response.content.decode("utf-8") # print(re.findall("href=.*?com",htm)) # def get_data(url): # print("%s 正在请求%s" % (os.getpid(), url)) # respOnse= requests.get(url) # print("%s 请求%s成功" % (os.getpid(), url)) # return response # # # def parser(obj): # # res = obj.result() # htm = res.content.decode("utf-8") # ls = re.findall("href=.*?com", htm) # print("%s解析完成! 共%s个连接" % (os.getpid(),len(ls))) # # if __name__ == '__main__': # urls = ["https://www.baidu.com", # "https://www.sina.com", # "https://www.tmall.com", # "https://www.taobao.com", # "https://www.jd.com", # "https://www.python.org", # "https://www.apple.com"] # pool = ProcessPoolExecutor(3) # # for i in urls: # obj = pool.submit(get_data, i) # # res = obj.result() # 会把任务变成串行 # # parser(res) # obj.add_done_callback(parser)
进程队列:可被多进程共享
线程队列:普通容器不能共享
from queue import Queue,LifoQueue,PriorityQueue q = Queue(2) q.put("a") q.put("b",timeout=1) print(q.get()) # a # print(q.get(timeout=2)) # b
from queue import Queue,LifoQueue,PriorityQueue lq = LifoQueue() lq.put("a") lq.put("b") lq.put("c") print(lq.get()) # c print(lq.get()) # b print(lq.get()) # a
from queue import Queue,LifoQueue,PriorityQueue pq = PriorityQueue() pq.put((2,"b")) pq.put((3,"c")) pq.put((1,"a")) print(pq.get()) # (1, 'a') print(pq.get()) # (2, 'b') print(pq.get()) # (3, 'c') pq.put((["b"],"csdlkjfksdjkfds")) pq.put((["a"],"bdslkfjdsfjd")) pq.put((["c"],"asd;kjfksdjfkdsf")) print(pq.get()) # (['a'], 'bdslkfjdsfjd') print(pq.get()) # (['b'], 'csdlkjfksdjkfds') print(pq.get()) # (['c'], 'asd;kjfksdjfkdsf')
Event对象实现了简单的线程通信机制,它提供了设置信号,清除信号,等待等方法用于实现线程间的通信。
1 设置信号
使用Event的set()方法可以设置Event对象内部的信号标志为真。Event对象提供了isSet()方法来判断其内部信号标志的状态。当使用event对象的set()方法后,isSet()方法返回真
2 清除信号
使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear方法后,isSet()方法返回假
3 等待
Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。
4 判断是否设置标志位(信号)
event.is_set()
import threading, time class Boss(threading.Thread): def run(self): print("BOSS:今晚大家都要加班到22:00。") print(event.isSet()) event.set() time.sleep(5) print("BOSS:<22:00>可以下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__ == "__main__": event = threading.Event() threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join()
import time from threading import Thread, Event e = Event() def start(): print("began start!") time.sleep(2) print("server start!") e.set() def conn(): while True: print("try connect...") e.wait(timeout=1) # 在这里阻塞 等1s 之后还没有isSet() 就退出 if e.isSet(): print("connect successful!") break else: print("connect filed!") Thread(target=start).start() Thread(target=conn).start()
协调多个线程工作----当一个线程需要执行某个操作,需获取另一个线程状态
通过事件在多线程间进行消息传递
import time from threading import Thread from threading import Event # 使用变量类完成多线程协作 is_boot = False def start(): global is_boot print("正在启动服务器......") time.sleep(5) print("服务器启动成功!") is_boot = True def connect(): while True: if is_boot: print("连接服务器成功!") break else: print("连接服务器失败!") time.sleep(0.5) Thread(target=start).start() Thread(target=connect).start()
import time from threading import Thread from threading import Event # 创建一个事件 e = Event() #默认False def start(): print("正在启动服务器......") time.sleep(5) print("服务器启动成功!") e.set() # 就是把事件的值设置为True def connect(): # 重试3次 for i in range(3): print("等待服务器启动....") e.wait(1) # 会阻塞 直到对方把事件设置为True if e.isSet(): print("连接成功!") break else: print("连接失败") else: #如果3次都没成功 就打印这个消息 print("服务器没有启动") Thread(target=start).start() Thread(target=connect).start()