作者:依一勇婷16_639 | 来源:互联网 | 2024-10-31 11:58
本文深入探讨了Python线程池的内部实现机制,作为对Apshceduler调度器研究的延伸。在先前关于Apshceduler源码分析的文章中,我们提到调度器通过`def_do_submit_`函数将任务提交到线程池。本文将进一步解析线程池的工作原理,包括任务分配、线程管理及性能优化等方面,为读者提供更全面的技术理解。
对Python线程池的研究是之前对Apshceduler分析的附加工作。
在之前对Apshceduler源码分析的文章中,写到调度器将任务放入线程池的函数
def _do_submit_job(self, job, run_times):
def callback(f):
exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
(f.exception(), getattr(f.exception(), '__traceback__', None)))
if exc:
self._run_job_error(job.id, exc, tb)
else:
self._run_job_success(job.id, f.result())
f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name)
f.add_done_callback(callback)
这里分析的线程池类是concurrent.futures.ThreadPoolExecutor,也就是上述代码中self._pool所使用的类。先上self._pool.submit函数的代码,再做详细分析
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
f和w是两个非常重要的变量,f作为submit返回的对象,submit函数的调用者可以对其添加回调,待fn执行完成后,会在当前线程执行,具体是如何实现的,这里先不说,下面再详细分析;w则是封装了线程需要执行的方法和参数,通过self._work_queue.put(w)方法放入一个队列当中。
self._adjust_thread_count()方法则是检查当前线程池的线程数量,如果小于设定的最大值,就开辟一个线程,代码就不上了,直接看这些个线程都是干嘛的
def _worker(executor_reference, work_queue):
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Notice other workers
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
这些线程就是一个死循环,不断的从任务队列中获取到_WorkItem,然后通过其封装方法,执行我们需要的任务。如果取到的任务为None,就往队列中再放入一个None,以通知其它线程结束,然后结束当前循环。
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
如果没有异常,执行结束后,会执行之前我们说的回调。在self.future.set_result(result)方法中会执行任务回调,当然了,是在当前线程中。如果需要写入数据库之类的操作,不建议在回调中直接写入。