1. 模块概述
multiprocessing模块允许Python程序生成子进程,实现并行处理任务。与threading模块不同,multiprocessing模块不受全局解释器锁(GIL)的影响,这意味着它可以充分利用多核CPU的能力。此外,该模块提供了丰富的API来管理进程,包括进程间通信、同步机制等。
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(square, [1, 2, 3]))
上述代码展示了如何使用Pool类来并行计算一系列数值的平方。Pool类创建了一个进程池,可以指定最大进程数,map方法则将任务分发给这些进程处理。
2. 创建和管理进程
除了Pool类,multiprocessing模块还提供了Process类来直接创建和控制单个进程。
from multiprocessing import Process
def greet(name):
print('Hello', name)
if __name__ == '__main__':
process = Process(target=greet, args=('John',))
process.start()
process.join()
在这个例子中,我们定义了一个greet函数,并通过Process类创建了一个新进程来执行这个函数。start方法启动进程,join方法确保主程序等待子进程完成后再继续执行。
3. 进程信息获取
有时候,了解进程的运行环境和状态是非常有用的。multiprocessing模块提供了获取当前进程及其父进程ID的方法。
import os
from multiprocessing import Process
def info(title):
print(title)
print('Module name:', __name__)
if hasattr(os, 'getppid'):
print('Parent process ID:', os.getppid())
print('Process ID:', os.getpid())
def greet(name):
info('Function greet')
print('Hello', name)
if __name__ == '__main__':
info('Main line')
process = Process(target=greet, args=('John',))
process.start()
process.join()
此示例展示了如何在主程序和子进程中打印相关信息。
4. 进程间通信
multiprocessing模块支持多种进程间通信方式,包括Queue和Pipe。Queue是一种线程和进程安全的数据结构,适合用于进程间的简单数据交换。
from multiprocessing import Process, Queue
def put_data(queue):
queue.put([42, None, 'hello'])
if __name__ == '__main__':
queue = Queue()
process = Process(target=put_data, args=(queue,))
process.start()
print(queue.get())
process.join()
Pipe则提供了一种更灵活的双向通信机制,适用于需要更复杂交互场景的情况。
from multiprocessing import Process, Pipe
def send_data(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_cOnn= Pipe()
process = Process(target=send_data, args=(child_conn,))
process.start()
print(parent_conn.recv())
process.join()
5. 进程同步
为了防止多个进程同时访问同一资源导致的问题,multiprocessing模块提供了Lock类来实现进程间的同步。
from multiprocessing import Process, Lock
def print_message(lock, i):
lock.acquire()
print('Hello world', i)
lock.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=print_message, args=(lock, num)).start()
6. 共享状态信息
虽然在多进程编程中应尽量避免共享状态,但multiprocessing模块还是提供了几种方法来实现数据共享。
共享内存
from multiprocessing import Process, Value, Array
def modify_shared(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
process = Process(target=modify_shared, args=(num, arr))
process.start()
process.join()
print(num.value)
print(arr[:])
共享数据结构
from multiprocessing import Process, Manager
def update_data(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
manager = Manager()
dictiOnary= manager.dict()
list_ = manager.list(range(10))
process = Process(target=update_data, args=(dictionary, list_))
process.start()
process.join()
print(dictionary)
print(list_)
7. 使用工作进程池
Pool类提供了一种高效的方式来管理和调度多个工作进程,特别适合于处理大量小型任务。
from multiprocessing import Pool, TimeoutError
import time
import os
def square(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
print(pool.map(square, range(10)))
for result in pool.imap_unordered(square, range(10)):
print(result)
async_result = pool.apply_async(square, (20,))
print(async_result.get(timeout=1))
async_pid = pool.apply_async(os.getpid, ())
print(async_pid.get(timeout=1))
multiple_pids = [pool.apply_async(os.getpid, ()) for _ in range(4)]
print([pid.get(timeout=1) for pid in multiple_pids])
sleep_result = pool.apply_async(time.sleep, (10,))
try:
print(sleep_result.get(timeout=1))
except TimeoutError:
print("The operation timed out")
以上代码演示了如何使用Pool类执行同步和异步任务,以及如何处理超时情况。
参考文档:官方文档