Python 的多线程受到 GIL(Global Interpreter Lock) 的限制,GIL 是一把加到了 Python 的解释器的锁,使得在任意时刻只允许一个 Python 进程使用 Python 解释器,也就是任意时刻,Python 只有一个线程在运行。
GIL 严重影响了计算密集型(CPU-bound) 的多线程程序,此时的多线程与单线程性能没什么差异,也发挥不了多核的威力。但对 I/O 密集型(I/O-bound) 影响不大,因为 CPU 多数时候是在等待。
为了突破 GIL 的 CPU 密集型程序的限制,可以使用非 CPython 解释器,如 Jython, IronPython 或 PyPy, 更为现实的做法就是使用子进程来替代线程去承担较为繁重的计算任务,因为 GIL 是加在进程上的,所以新的进程有独立的 GIL.
新建子进程来处理任务
需用到 multiprocessing.Process 类,这个类在 Python 2.6 就开始存在了。一般的编程语言创建一个进程是要去执行一个外部任命,然后获得它的输出,而 Python 可以像创建线程一样创建子进程,且欲执行的任务直接由 Python 在其中编写
from datetime import datetime
import time
from multiprocessing import Process
import osdef job(name):for _ in range(3):print('[Child {}][{}]'.format(os.getpid(), datetime.now()))time.sleep(1)print(f'sub process {os.getpid()} {name} done')if __name__ == '__main__':p = Process(target=job, args=('bob',)) p.start()print(f'main process {os.getpid()} done') p.join()
from datetime import datetimeimport timefrom multiprocessing import Processimport osdef job ( name ) :for _ in range ( 3 ) :print ( '[Child {}][{}]' . format ( os . getpid ( ) , datetime . now ( ) ) )time . sleep ( 1 )print ( f 'sub process {os.getpid()} {name} done' )if __name__ == '__main__' :p = Process ( target = job , args = ( 'bob' , ) ) p . start ( )print ( f 'main process {os.getpid()} done' ) p . join ( )
执行后输出
main process 67474 done [Child][2021-09-02 19:32:14.121689] [Child][2021-09-02 19:32:15.126048] [Child][2021-09-02 19:32:16.129741] sub process 67476 bob done
可以看到主进程与子进程分别有自己不同的进程 ID。
还有就是 if name == ‘main’ 变得是必要的语句了,不能把想要立即执行的代码丢在函数外了事,否则就是出现下面的错误信息
RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable.
进程之间传递参数除了用 args=(‘bob’,) 的任务参数外,还可用 multiprocessing.Pipe 在进程之间双向通信。也可以通过内存来共享数据,用到 multiprocessing.Value 和 multiprocessing.Array 。这一部分的详细内容请参考官方文档,在实际编程中可能会经常用到。
执行进程任务也可以加锁,用 multiprocessing.Lock , lock.acquire()…try: … finally: lock.release() 标准模式,因为进程之间需要保护对系统中唯一资源的竞争
在 Jupyter 中使用 Process 的问题
如果直接把上面的代码放到 JpyterLab 中去执行,将会看到这样的错误
Traceback (most recent call last):File "", line 1, in <module>File "/Users/yanbin/jupyterlab-venv/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_mainexitcode &#61; _main(fd, parent_sentinel)File "/Users/yanbin/jupyterlab-venv/lib/python3.9/multiprocessing/spawn.py", line 126, in _mainself &#61; reduction.pickle.load(from_parent)
AttributeError: Can&#39;t get attribute &#39;job&#39; on __main__&#39; (built-in)>
Traceback ( most recent call last ) :File "" , line 1 , in & lt ; module & gt ;File "/Users/yanbin/jupyterlab-venv/lib/python3.9/multiprocessing/spawn.py" , line 116 , in spawn_mainexitcode &#61; _main ( fd , parent_sentinel )File "/Users/yanbin/jupyterlab-venv/lib/python3.9/multiprocessing/spawn.py" , line 126 , in _mainself &#61; reduction . pickle . load ( from_parent )AttributeError : Can &#39;t get attribute &#39; job &#39; on <module &#39; __main_ _ &#39; ( built - in ) & gt ;
这不是 Python 版本的问题&#xff0c;放别的 Python 3.8 下也是这样的错误&#xff0c;原因就是 Jupyter 无法与 multiprocessing 一同工作&#xff0c;pickle 模块在序列化数据向进程发送时出异常。解决办法是要用 multiprocess 替换掉 multiprocessing 模块
pip install multiprocess from multiprocess import Process
然后在 JupyterLab 中执行替换成 multiprocess 的版本&#xff0c;输出略有不同
[Child][2021-09-02 19:41:55.326549] main process 62917 done [Child][2021-09-02 19:41:56.335774] [Child][2021-09-02 19:41:57.342169] sub process 68144 bob done
与在 Python 终端执行的一个区别是&#xff0c;子进程总有一行在 main process … 之前输出&#xff0c;这没什么要紧的。
使用进程池
有线程池&#xff0c;相应的也有进程池&#xff0c;参照一个官方文档中的简单例子
from multiprocessing import Pool
import osdef f(x):print(f&#39;subprocess id: {os.getpid()}&#39;)return x*xif __name__ &#61;&#61; &#39;__main__&#39;:with Pool(5) as p:print(p.map(f, [1, 2, 3]))
from multiprocessing import Poolimport osdef f ( x ) :print ( f &#39;subprocess id: {os.getpid()}&#39; )return x* xif __name__ &#61;&#61; &#39;__main__&#39; :with Pool ( 5 ) as p :print ( p . map ( f , [ 1 , 2 , 3 ] ) )
输出为
subprocess id: 69348 subprocess id: 69350 subprocess id: 69347 [1, 4, 9]
这是用到了 Context 来管理进程池&#xff0c;如果逐步操作就是
pool &#61; Pool(5)
[pool.apply_async(f, args&#61;(i, )) for i in (1, 2, 3)]
pool.close()
pool.join()
pool &#61; Pool ( 5 )[ pool . apply_async ( f , args &#61; ( i , ) ) for i in ( 1 , 2 , 3 ) ]pool . close ( )pool . join ( )
进程池执行器
这个翻译有点别扭&#xff0c;直接叫 ProcessPoolExecutor 习惯些。
ProcessPoolExecutor 在 concurrent.futures 模块中&#xff0c;它是 Python 3.2 加入进来的。至于用法呢&#xff0c;只要想像它是 ThreadPoolExecutor 的进程版本就差不多了。它提供的方法用
- submit(fn, /, *args, **kwargs): 向进程池提交任务
- map(func, *iterables, timeout&#61;None, chunksize&#61;1): 批量提交任务的快捷写法
- shutdown(wait&#61;True, *, cancel_futures&#61;False): 关闭进程池
首先仍然用一个使用了 with 关键字的写法
from concurrent.futures import ProcessPoolExecutor
import osdef f(x):return f&#39;{os.getpid()}: {x*x}&#39;if __name__ &#61;&#61; &#39;__main__&#39;:with ProcessPoolExecutor(max_workers&#61;5) as executor:results &#61; executor.map(f, [1, 3, 4])for i in results:print(i)
from concurrent . futures import ProcessPoolExecutorimport osdef f ( x ) :return f &#39;{os.getpid()}: {x*x}&#39;if __name__ &#61;&#61; &#39;__main__&#39; :with ProcessPoolExecutor ( max_workers &#61; 5 ) as executor :results &#61; executor . map ( f , [ 1 , 3 , 4 ] )for i in results :print ( i )
输出如下&#xff1a;
也可以用 submit() 函数来提交任务&#xff0c;得到的是一个 Future。关于 Future, 以及类似的 submit(), executor.map() 函数在Python 多线程编程 有所覆盖。
另外&#xff0c;在构建 ProcessPoolExecutor 时如果不指 max_workers 参数将会取系统 CPU 的内核数(multiprocessing.cpu_count())。
如果不对 ProcessPoolExecutor 使用 with 语句&#xff0c;则需要去用 submit() 提交的任务进行 wait&#xff0c;参照
如果对软件测试、接口测试、自动化测试、持续集成、面试经验。感兴趣可以进到806549072&#xff0c;群内会有不定期的分享测试资料。还会有技术大牛&#xff0c;业内同行一起交流技术