作者:手机用户2602905773 | 来源:互联网 | 2023-10-12 10:00
池 标准模块(concurrent.futures)ProcessPoolExecutor ThreadPoolExecutor#1介绍concurrent.futures模块
池 标准模块(concurrent.futures) ProcessPoolExecutor ThreadPoolExecutor
#1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.
#2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务
#map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
#shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
#result(timeout=None)
取得结果
#add_done_callback(fn)
回调函数
1 import os
2 import time
3 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
4 def func(i):
5 print('thread', i ,os.getpid())
6 time.sleep(1)
7 print('thread %s end' % i)
8 tp = ThreadPoolExecutor(5)
9 if __name__ == '__main__':
10 tp.submit(func, 1)
11 tp.shutdown()
12 print('主线程', os.getpid())
13 #thread 1 12492
14 #thread 1 end
15 #主线程 12492
16
17 import os
18 import time
19 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
20 def func(i):
21 print('thread', i ,os.getpid())
22 time.sleep(1)
23 print('thread %s end' % i)
24 if __name__ == '__main__':
25 tp = ProcessPoolExecutor(5)
26 tp.submit(func, 1)
27 tp.shutdown()
28 print('主线程', os.getpid())
29 #thread 1 10792
30 #thread 1 end
31 #主线程 3016
32
33 #线程中主线程与子线程的pid相同
34 #进程中主进程与子进程的pid不同
进程池和线程池与子进程pid对比
1 import os
2 import time
3 from concurrent.futures import ProcessPoolExecutor
4 def func(i):
5 print('thread', i, os.getpid())
6 time.sleep(1)
7 print('thread %s end' % i)
8 return i * '*'
9 def call_back(arg):
10 print('call back:', os.getpid())
11 print('ret:', arg.result())
12 if __name__ == '__main__':
13 tp = ProcessPoolExecutor(5)
14 ret_lst = []
15 for i in range(20):
16 tp.submit(func, i).add_done_callback(call_back)
17 print('主线程', os.getpid())
18 #回调函数的pid和主进程中的pid相同
19
20 import time
21 from concurrent.futures import ThreadPoolExecutor
22 from threading import current_thread as cthread
23 def func(i):
24 print('thread', i, cthread().ident)
25 time.sleep(1)
26 print('thread %s end' % i)
27 return i * '*'
28 def call_back(arg):
29 print('call back:', cthread().ident)
30 print('ret:', arg.result())
31 tp = ThreadPoolExecutor(5)
32 ret_lst = []
33 for i in range(20):
34 tp.submit(func, i).add_done_callback(call_back)
35 print('主线程', cthread().ident)
36 #回调函数的pid和子线程的pid相同
37
38 #进程池中回调函数是主进程中完成的
39 #线程池中回调函数是子线程中完成的
回调函数的区别
1 import time
2 from concurrent.futures import ThreadPoolExecutor
3 from threading import current_thread as cthread
4 def func(i):
5 print('thread', i, cthread().ident)
6 time.sleep(1)
7 print('thread %s end' % i)
8
9 tp = ThreadPoolExecutor(5)
10 for i in range(20):
11 tp.submit(func, i)
12 tp.shutdown() #close join
13 print('主线程')
14
15
16 import time
17 from concurrent.futures import ThreadPoolExecutor
18 def func(i):
19 print('thread', i)
20 time.sleep(1)
21 print('thread %s end' % i)
22 return i * '*'
23 tp = ThreadPoolExecutor(5)
24 res = tp.map(func, range(20)) #map取代了for+submit
25 for i in res:
26 print(i)
map
回调函数:进程池—由主进程实现的
线程池—由子线程实现的
线程池:实例化线程池 ThreadPoolExcutor 5*cpu_count
异步提交任务 submit / map
阻塞直到任务完成 shutdown
获取子线程的返回值 result
回调函数 add_done_callback