作者:晓亮居士_264 | 来源:互联网 | 2023-10-13 13:35
ThreadPoolExecutor 核心工作详解 ThreadPoolExecutor内部是如何工作的? 什么是future对象? 线程什么时候被创建? 任务是如何被线程处理的? 结束之后如何销毁的 1. ThreadPoolExecutor 我们所有的工作都是在ThreadPoolExecutor
完成的:
with futures. ThreadPoolExecutor( max_workers= 2 ) as executor: . . .
executor中定义了几个关键的变量。
class ThreadPoolExecutor ( _base. Executor) : def __init__ ( self, max_workers= None , thread_name_prefix= '' , . . . ) : self. _max_workers = max_workers self. _work_queue = queue. SimpleQueue( ) self. _threads = set ( )
2. executor.submit() 的核心 future对象是我们通过调用submit()时返回的。
future = executor. submit( sleep_and_print, _i)
看看submit() 做了哪些事情:
def submit ( self, fn, * args, ** kwargs) : . . . f = _base. Future( ) w = _WorkItem( f, fn, args, kwargs) self. _work_queue. put( w) self. _adjust_thread_count( ) return f
3. Future 对象 future对象初始化时,定义了几个实例变量。
class Future ( object ) : """代表一个异步计算的结果""" def __init__ ( self) : """Initializes the future. Should not be called by clients.""" self. _condition = threading. Condition( ) self. _state = PENDING self. _result = None self. _exception = None self. _done_callbacks = [ ]
future有五种状态:
_FUTURE_STATES = [PENDING,RUNNING,CANCELLED,CANCELLED_AND_NOTIFIED,FINISHED ]
4. WorkItem 对象 WorkItem和future是一对一的,submit()会为每一个future都创建一个WorkItem。WorkItem记录待处理的任务fn及其参数,线程会从work_queue中取出WorkItem,调用它的run()方法。WorkItem只定义了一个run()方法:
class _WorkItem ( object ) : def __init__ ( self, future, fn, args, kwargs) : self. future = future self. fn = fn self. args = args self. kwargs = kwargs def run ( self) : if not self. future. set_running_or_notify_cancel( ) : return try : result = self. fn( * self. args, ** self. kwargs) except BaseException as exc: self. future. set_exception( exc) self = None else : self. future. set_result( result)
5. executor._adjust_thread_count 函数 def _adjust_thread_count ( self) : def weakref_cb ( _, q&#61; self. _work_queue) : . . . num_threads &#61; len ( self. _threads) if num_threads < self. _max_workers: thread_name &#61; &#39;%s_%d&#39; % ( self. _thread_name_prefix or self, num_threads) t &#61; threading. Thread( name&#61; thread_name, target&#61; _worker, args&#61; ( weakref. ref( self, weakref_cb) , self. _work_queue, . . . ) ) t. daemon &#61; True t. start( ) self. _threads. add( t) _threads_queues[ t] &#61; self. _work_queue
t &#61; threading. Thread( name&#61; thread_name, target&#61; _worker, )
这个 target 指向的 _worker 函数很关键&#xff0c;它会执行一个 while True 循环&#xff0c;从 work_queue 中不断获取 work_item&#xff0c; 然后执行 work_item 的 run() 方法&#xff1a;
def _worker ( . . . , work_queue, . . . ) : try : while True : work_item &#61; work_queue. get( block&#61; True ) if work_item is not None : work_item. run( ) del work_itemcontinue . . . except BaseException: _base. LOGGER. critical( &#39;Exception in worker&#39; , exc_info&#61; True )
这里没有贴出线程退出 While True 的循环&#xff0c;是不想代码过于复杂了。如果感兴趣&#xff0c;可以自己查看源码。
如果你对 weakref.ref(self, weakref_cb) 不理解&#xff0c;下面我讲关键代码提炼出来&#xff0c;模拟该弱引用&#xff1a;
不知道什么是弱引用&#xff0c;请先参考文章&#xff1a;[Python] 高级用法 - 弱引用详解
import weakrefdef _worker ( executor_reference) : print ( &#39;Enter function: _worker&#39; ) print ( executor_reference) executor &#61; executor_reference( ) print ( executor) if executor is not None : executor. say_hello( ) def run ( ) : a &#61; A( &#39;DemoA&#39; ) def weakref_cb ( _) : print ( &#39;Enter function: weakref_cb&#39; ) print ( ( weakref. ref( a, weakref_cb) ) ) print ( weakref. ref( a) ) _worker( ( weakref. ref( a, weakref_cb) ) ) _worker( weakref. ref( a) ) class A : def __init__ ( self, name) : self. name &#61; namedef say_hello ( self) : print ( f"Hello, I am {self.name}." ) if __name__ &#61;&#61; &#39;__main__&#39; : run( )
weakref_cb 并没有被执行&#xff1a;
< weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848> < weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848> Enter function: _worker< weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848> < __main__.A object at 0x000002707F073848> Hello, I am DemoA. Enter function: _worker< weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848> < __main__.A object at 0x000002707F073848> Hello, I am DemoA.
6. executor 如何退出 使用 with
关键字&#xff0c;在结束时会调用__exit__
方法&#xff0c;这个方法调用了 shutdown() 方法。 shutdown() 会做两件事&#xff1a;第一&#xff0c;给 work_queue 传入一个 None 值&#xff0c;线程执行时&#xff0c;获取到 None 就会退出 While True 循环&#xff1b; 第二&#xff0c;如果 wait 为 True&#xff0c;将剩余所有未执行完的线程&#xff0c;调用它们的 join() 方法&#xff0c;让主线程等待它们完成&#xff1a;
class Executor ( object ) : . . . def __enter__ ( self) : return selfdef __exit__ ( self, exc_type, exc_val, exc_tb) : self. shutdown( wait&#61; True ) return False class ThreadPoolExecutor ( _base. Executor) : . . . def shutdown ( self, wait&#61; True ) : with self. _shutdown_lock: self. _shutdown &#61; True self. _work_queue. put( None ) if wait: for t in self. _threads: t. join( )