作者:速度向前迈进 | 来源:互联网 | 2023-07-09 11:11
为什么使用concurrent.futures模块的以下Python代码永远挂起?
import concurrent.futures
class A:
def f(self):
print("called")
class B(A):
def f(self):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
executor.submit(super().f)
if __name__ == "__main__":
B().f()
该调用引发了一个不可见的异常[Errno 24]太多打开的文件(要查看它,用print执行器替换行executor.submit(super().f)(executor.submit(super().f).exception()) ).
但是,用ThreadPoolExecutor替换ProcessPoolExecutor会按预期打印“被调用”.
为什么使用multiprocessing.pool模块的以下Python代码会引发异常AssertionError:守护进程不允许有子进程?
import multiprocessing.pool
class A:
def f(self):
print("called")
class B(A):
def f(self):
pool = multiprocessing.pool.Pool(2)
pool.apply(super().f)
if __name__ == "__main__":
B().f()
但是,用ThreadPool替换Pool会按预期打印“被调用”.
环境:CPython 3.7,MacOS 10.14.
解决方法:
concurrent.futures.ProcessPoolExecutor和multiprocessing.pool.Pool使用multiprocessing.queues.Queue将工作函数对象从调用者传递给工作进程,Queue使用pickle模块来序列化/反序列化,但是它无法正确处理带子类的绑定方法对象例如:
f = super().f
print(f)
pf = pickle.loads(pickle.dumps(f))
print(pf)
输出:
>
>
A.f变为B.f,这有效地在工作进程中创建无限递归调用B.f到B.f.
pickle.dumps利用绑定方法对象的__reduce__方法,IMO,its implementation,没有考虑这个场景,它没有处理真正的func对象,而只是尝试从实例自我obj(B())返回简单的名称(f),这导致Bf,很可能是一个错误.
好消息是,我们知道问题出在哪里,我们可以通过实现我们自己的简化函数来修复它,该函数尝试从原始函数(A.f)和实例obj(B())重新创建绑定方法对象:
import types
import copyreg
import multiprocessing
def my_reduce(obj):
return (obj.__func__.__get__, (obj.__self__,))
copyreg.pickle(types.MethodType, my_reduce)
multiprocessing.reduction.register(types.MethodType, my_reduce)
我们可以这样做,因为bound方法是一个描述符.
ps:我已经提交了a bug report.