热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[Python]future线程处理并发的核心源码详解

ThreadPoolExecutor核心工作详解ThreadPoolExecutor内部是如何工作的?什么是future对象?线程什么时候被创建&#x

ThreadPoolExecutor 核心工作详解


  1. ThreadPoolExecutor内部是如何工作的?
  2. 什么是future对象?
  3. 线程什么时候被创建?
  4. 任务是如何被线程处理的?
  5. 结束之后如何销毁的

1. ThreadPoolExecutor

我们所有的工作都是在ThreadPoolExecutor完成的:

with futures.ThreadPoolExecutor(max_workers=2) as executor:...

executor中定义了几个关键的变量。

class ThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix='', ...):self._max_workers = max_workers # 最大线程数self._work_queue = queue.SimpleQueue() # 任务池Queue,所有线程从该Queue中取任务然后执行self._threads = set() # 线程池

2. executor.submit() 的核心

future对象是我们通过调用submit()时返回的。

future = executor.submit(sleep_and_print, _i) # submit

看看submit() 做了哪些事情:

# def submit(*args, **kwargs)对参数进行了处理,实际上相当于这样:
def submit(self, fn, *args, **kwargs): ...# 下面是核心代码f = _base.Future() # 创建一个Future()对象w = _WorkItem(f, fn, args, kwargs) # 创建一个_WorkItem,传入可调用的fn和fn的所有参数self._work_queue.put(w) # 将_WorkItem加入_work_queueself._adjust_thread_count() # 调整线程数量,线程就是在这个函数中被创建的return f # 返回future

3. Future 对象

future对象初始化时,定义了几个实例变量。

class Future(object):"""代表一个异步计算的结果"""def __init__(self):"""Initializes the future. Should not be called by clients."""self._condition = threading.Condition()self._state = PENDING # 状态。self._result = None # 结果。处理调用方传入的fn的结果,_result = fn(*args, **kwargs)self._exception = None # 异常值。处理fn时,若fn抛出异常,则设置改异常值。self._done_callbacks = [] # 回调函数数组。用于处理add_done_callback传入的可调用对象。

future有五种状态:

_FUTURE_STATES = [PENDING,RUNNING,CANCELLED,CANCELLED_AND_NOTIFIED,FINISHED
]

4. WorkItem 对象

WorkItem和future是一对一的,submit()会为每一个future都创建一个WorkItem。WorkItem记录待处理的任务fn及其参数,线程会从work_queue中取出WorkItem,调用它的run()方法。WorkItem只定义了一个run()方法:

class _WorkItem(object):def __init__(self, future, fn, args, kwargs):self.future = future # futureself.fn = fn # fnself.args = args # fn的参数self.kwargs = kwargs # fn的参数def run(self):if not self.future.set_running_or_notify_cancel():returntry:result = self.fn(*self.args, **self.kwargs) # 调用fnexcept BaseException as exc:self.future.set_exception(exc) # 如果fn有报错,设置错误类型给future# Break a reference cycle with the exception 'exc'self = Noneelse:self.future.set_result(result) # 将fn的返回值,设置给future

5. executor._adjust_thread_count 函数

def _adjust_thread_count(self):def weakref_cb(_, q&#61;self._work_queue): # 这个函数貌似没什么用&#xff0c;后文分析...num_threads &#61; len(self._threads) # _threads是已经创建的线程集合(set)if num_threads < self._max_workers: # 如果已创建的线程数小于最大线程数thread_name &#61; &#39;%s_%d&#39; % (self._thread_name_prefix or self,num_threads) # 线程名&#61;线程名前缀&#43;线程序号t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,args&#61;(weakref.ref(self, weakref_cb),self._work_queue,...)) # 创建线程&#xff0c;主要的两个参数是weakref.ref(self, weakref_cb), self._work_queue。前者是当前executor的弱引用&#xff0c;后者是executor的任务池t.daemon &#61; True # 守护线程。主线程退出时&#xff0c;守护线程会自动退出t.start() # 启动线程self._threads.add(t) # 加入已创建的线程集合_threads_queues[t] &#61; self._work_queue # _threads_queues是一个全局的弱引用WeakKeyDictionary&#xff0c;记录所有executor的任务池&#xff0c;以便退出时进行清理操作

t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,)

这个 target 指向的 _worker 函数很关键&#xff0c;它会执行一个 while True 循环&#xff0c;从 work_queue 中不断获取 work_item&#xff0c; 然后执行 work_item 的 run() 方法&#xff1a;

def _worker(..., work_queue, ...):try:while True:work_item &#61; work_queue.get(block&#61;True) # 从 work_queue 中取一个 _WorkItemif work_item is not None:work_item.run() # 调用 _WorkItem.run()# del 删除对象的引用&#xff0c;会将 x 的引用计数减一&#xff0c;当引用计数为0时&#xff0c;调用 x.__del__() 销毁实例。del work_itemcontinue...except BaseException:_base.LOGGER.critical(&#39;Exception in worker&#39;, exc_info&#61;True)

这里没有贴出线程退出 While True 的循环&#xff0c;是不想代码过于复杂了。如果感兴趣&#xff0c;可以自己查看源码。

如果你对 weakref.ref(self, weakref_cb) 不理解&#xff0c;下面我讲关键代码提炼出来&#xff0c;模拟该弱引用&#xff1a;

不知道什么是弱引用&#xff0c;请先参考文章&#xff1a;[Python] 高级用法 - 弱引用详解

import weakrefdef _worker(executor_reference):print(&#39;Enter function: _worker&#39;)print(executor_reference)executor &#61; executor_reference()print(executor)if executor is not None:executor.say_hello()def run():a &#61; A(&#39;DemoA&#39;)def weakref_cb(_):print(&#39;Enter function: weakref_cb&#39;)print((weakref.ref(a, weakref_cb)))print(weakref.ref(a))_worker((weakref.ref(a, weakref_cb))) # 两行代码效果相同_worker(weakref.ref(a)) # 两行代码效果相同class A:def __init__(self, name):self.name &#61; namedef say_hello(self):print(f"Hello, I am {self.name}.")if __name__ &#61;&#61; &#39;__main__&#39;:run()

weakref_cb 并没有被执行&#xff1a;

<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.

6. executor 如何退出

使用 with 关键字&#xff0c;在结束时会调用__exit__方法&#xff0c;这个方法调用了 shutdown() 方法。 shutdown() 会做两件事&#xff1a;第一&#xff0c;给 work_queue 传入一个 None 值&#xff0c;线程执行时&#xff0c;获取到 None 就会退出 While True 循环&#xff1b; 第二&#xff0c;如果 wait 为 True&#xff0c;将剩余所有未执行完的线程&#xff0c;调用它们的 join() 方法&#xff0c;让主线程等待它们完成&#xff1a;

class Executor(object):...def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.shutdown(wait&#61;True)return Falseclass ThreadPoolExecutor(_base.Executor):...def shutdown(self, wait&#61;True):with self._shutdown_lock:self._shutdown &#61; Trueself._work_queue.put(None)if wait:for t in self._threads:t.join()

推荐阅读
  • 使用Matlab创建动态GIF动画
    动态GIF图可以有效增强数据表达的直观性和吸引力。本文将详细介绍如何利用Matlab软件生成动态GIF图,涵盖基本代码实现与高级应用技巧。 ... [详细]
  • 本文详细介绍了在Luat OS中如何实现C与Lua的混合编程,包括在C环境中运行Lua脚本、封装可被Lua调用的C语言库,以及C与Lua之间的数据交互方法。 ... [详细]
  • 根据官方定义,RxJava是一种用于异步编程和可观察数据流的API。其核心特性在于流式处理能力和丰富的操作符支持。 ... [详细]
  • Java 并发编程:RunnableScheduledFuture 接口详解
    本文深入解析了 Java 并发编程中 RunnableScheduledFuture 接口的源代码及其在标准线程池中的应用。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 本文介绍了如何使用Python的Paramiko库批量更新多台服务器的登录密码。通过示例代码展示了具体实现方法,确保了操作的高效性和安全性。Paramiko库提供了强大的SSH2协议支持,使得远程服务器管理变得更加便捷。此外,文章还详细说明了代码的各个部分,帮助读者更好地理解和应用这一技术。 ... [详细]
  • Netty框架中运用Protobuf实现高效通信协议
    在Netty框架中,通过引入Protobuf来实现高效的通信协议。为了使用Protobuf,需要先准备好环境,包括下载并安装Protobuf的代码生成器`protoc`以及相应的源码包。具体资源可从官方下载页面获取,确保版本兼容性以充分发挥其性能优势。此外,配置好开发环境后,可以通过定义`.proto`文件来自动生成Java类,从而简化数据序列化和反序列化的操作,提高通信效率。 ... [详细]
  • 基于SSM框架的在线考试系统:随机组卷功能详解
    本文深入探讨了基于SSM(Spring, Spring MVC, MyBatis)框架构建的在线考试系统中,随机组卷功能的设计与实现方法。 ... [详细]
  • 深入解析 C++ 中的 String 和 Vector
    本文详细介绍了 C++ 编程语言中 String 和 Vector 的使用方法及特性,旨在帮助开发者更好地理解和应用这两个重要的容器。 ... [详细]
  • 本文探讨了异步编程的发展历程,从最初的AJAX异步回调到现代的Promise、Generator+Co以及Async/Await等技术。文章详细分析了Promise的工作原理及其源码实现,帮助开发者更好地理解和使用这一重要工具。 ... [详细]
  • Requests库的基本使用方法
    本文介绍了Python中Requests库的基础用法,包括如何安装、GET和POST请求的实现、如何处理Cookies和Headers,以及如何解析JSON响应。相比urllib库,Requests库提供了更为简洁高效的接口来处理HTTP请求。 ... [详细]
  • 深入理解Java多线程与并发机制
    本文探讨了Java多线程和并发机制的核心概念,包括多线程类的分类、执行器框架、并发容器及控制工具。通过详细解析这些组件,帮助开发者更好地理解和应用多线程技术。 ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • POJ 2482 星空中的星星:利用线段树与扫描线算法解决
    在《POJ 2482 星空中的星星》问题中,通过运用线段树和扫描线算法,可以高效地解决星星在窗口内的计数问题。该方法不仅能够快速处理大规模数据,还能确保时间复杂度的最优性,适用于各种复杂的星空模拟场景。 ... [详细]
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
author-avatar
晓亮居士_264
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有