热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[Python]future线程处理并发的核心源码详解

ThreadPoolExecutor核心工作详解ThreadPoolExecutor内部是如何工作的?什么是future对象?线程什么时候被创建&#x

ThreadPoolExecutor 核心工作详解


  1. ThreadPoolExecutor内部是如何工作的?
  2. 什么是future对象?
  3. 线程什么时候被创建?
  4. 任务是如何被线程处理的?
  5. 结束之后如何销毁的

1. ThreadPoolExecutor

我们所有的工作都是在ThreadPoolExecutor完成的:

with futures.ThreadPoolExecutor(max_workers=2) as executor:...

executor中定义了几个关键的变量。

class ThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix='', ...):self._max_workers = max_workers # 最大线程数self._work_queue = queue.SimpleQueue() # 任务池Queue,所有线程从该Queue中取任务然后执行self._threads = set() # 线程池

2. executor.submit() 的核心

future对象是我们通过调用submit()时返回的。

future = executor.submit(sleep_and_print, _i) # submit

看看submit() 做了哪些事情:

# def submit(*args, **kwargs)对参数进行了处理,实际上相当于这样:
def submit(self, fn, *args, **kwargs): ...# 下面是核心代码f = _base.Future() # 创建一个Future()对象w = _WorkItem(f, fn, args, kwargs) # 创建一个_WorkItem,传入可调用的fn和fn的所有参数self._work_queue.put(w) # 将_WorkItem加入_work_queueself._adjust_thread_count() # 调整线程数量,线程就是在这个函数中被创建的return f # 返回future

3. Future 对象

future对象初始化时,定义了几个实例变量。

class Future(object):"""代表一个异步计算的结果"""def __init__(self):"""Initializes the future. Should not be called by clients."""self._condition = threading.Condition()self._state = PENDING # 状态。self._result = None # 结果。处理调用方传入的fn的结果,_result = fn(*args, **kwargs)self._exception = None # 异常值。处理fn时,若fn抛出异常,则设置改异常值。self._done_callbacks = [] # 回调函数数组。用于处理add_done_callback传入的可调用对象。

future有五种状态:

_FUTURE_STATES = [PENDING,RUNNING,CANCELLED,CANCELLED_AND_NOTIFIED,FINISHED
]

4. WorkItem 对象

WorkItem和future是一对一的,submit()会为每一个future都创建一个WorkItem。WorkItem记录待处理的任务fn及其参数,线程会从work_queue中取出WorkItem,调用它的run()方法。WorkItem只定义了一个run()方法:

class _WorkItem(object):def __init__(self, future, fn, args, kwargs):self.future = future # futureself.fn = fn # fnself.args = args # fn的参数self.kwargs = kwargs # fn的参数def run(self):if not self.future.set_running_or_notify_cancel():returntry:result = self.fn(*self.args, **self.kwargs) # 调用fnexcept BaseException as exc:self.future.set_exception(exc) # 如果fn有报错,设置错误类型给future# Break a reference cycle with the exception 'exc'self = Noneelse:self.future.set_result(result) # 将fn的返回值,设置给future

5. executor._adjust_thread_count 函数

def _adjust_thread_count(self):def weakref_cb(_, q&#61;self._work_queue): # 这个函数貌似没什么用&#xff0c;后文分析...num_threads &#61; len(self._threads) # _threads是已经创建的线程集合(set)if num_threads < self._max_workers: # 如果已创建的线程数小于最大线程数thread_name &#61; &#39;%s_%d&#39; % (self._thread_name_prefix or self,num_threads) # 线程名&#61;线程名前缀&#43;线程序号t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,args&#61;(weakref.ref(self, weakref_cb),self._work_queue,...)) # 创建线程&#xff0c;主要的两个参数是weakref.ref(self, weakref_cb), self._work_queue。前者是当前executor的弱引用&#xff0c;后者是executor的任务池t.daemon &#61; True # 守护线程。主线程退出时&#xff0c;守护线程会自动退出t.start() # 启动线程self._threads.add(t) # 加入已创建的线程集合_threads_queues[t] &#61; self._work_queue # _threads_queues是一个全局的弱引用WeakKeyDictionary&#xff0c;记录所有executor的任务池&#xff0c;以便退出时进行清理操作

t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,)

这个 target 指向的 _worker 函数很关键&#xff0c;它会执行一个 while True 循环&#xff0c;从 work_queue 中不断获取 work_item&#xff0c; 然后执行 work_item 的 run() 方法&#xff1a;

def _worker(..., work_queue, ...):try:while True:work_item &#61; work_queue.get(block&#61;True) # 从 work_queue 中取一个 _WorkItemif work_item is not None:work_item.run() # 调用 _WorkItem.run()# del 删除对象的引用&#xff0c;会将 x 的引用计数减一&#xff0c;当引用计数为0时&#xff0c;调用 x.__del__() 销毁实例。del work_itemcontinue...except BaseException:_base.LOGGER.critical(&#39;Exception in worker&#39;, exc_info&#61;True)

这里没有贴出线程退出 While True 的循环&#xff0c;是不想代码过于复杂了。如果感兴趣&#xff0c;可以自己查看源码。

如果你对 weakref.ref(self, weakref_cb) 不理解&#xff0c;下面我讲关键代码提炼出来&#xff0c;模拟该弱引用&#xff1a;

不知道什么是弱引用&#xff0c;请先参考文章&#xff1a;[Python] 高级用法 - 弱引用详解

import weakrefdef _worker(executor_reference):print(&#39;Enter function: _worker&#39;)print(executor_reference)executor &#61; executor_reference()print(executor)if executor is not None:executor.say_hello()def run():a &#61; A(&#39;DemoA&#39;)def weakref_cb(_):print(&#39;Enter function: weakref_cb&#39;)print((weakref.ref(a, weakref_cb)))print(weakref.ref(a))_worker((weakref.ref(a, weakref_cb))) # 两行代码效果相同_worker(weakref.ref(a)) # 两行代码效果相同class A:def __init__(self, name):self.name &#61; namedef say_hello(self):print(f"Hello, I am {self.name}.")if __name__ &#61;&#61; &#39;__main__&#39;:run()

weakref_cb 并没有被执行&#xff1a;

<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.

6. executor 如何退出

使用 with 关键字&#xff0c;在结束时会调用__exit__方法&#xff0c;这个方法调用了 shutdown() 方法。 shutdown() 会做两件事&#xff1a;第一&#xff0c;给 work_queue 传入一个 None 值&#xff0c;线程执行时&#xff0c;获取到 None 就会退出 While True 循环&#xff1b; 第二&#xff0c;如果 wait 为 True&#xff0c;将剩余所有未执行完的线程&#xff0c;调用它们的 join() 方法&#xff0c;让主线程等待它们完成&#xff1a;

class Executor(object):...def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.shutdown(wait&#61;True)return Falseclass ThreadPoolExecutor(_base.Executor):...def shutdown(self, wait&#61;True):with self._shutdown_lock:self._shutdown &#61; Trueself._work_queue.put(None)if wait:for t in self._threads:t.join()

推荐阅读
  • 深入解析Java枚举及其高级特性
    本文详细介绍了Java枚举的概念、语法、使用规则和应用场景,并探讨了其在实际编程中的高级应用。所有相关内容已收录于GitHub仓库[JavaLearningmanual](https://github.com/Ziphtracks/JavaLearningmanual),欢迎Star并持续关注。 ... [详细]
  • 本文详细探讨了HTML表单中GET和POST请求的区别,包括它们的工作原理、数据传输方式、安全性及适用场景。同时,通过实例展示了如何在Servlet中处理这两种请求。 ... [详细]
  • 本文详细解析了Java中hashCode()和equals()方法的实现原理及其在哈希表结构中的应用,探讨了两者之间的关系及其实现时需要注意的问题。 ... [详细]
  • 本文介绍如何从字符串中移除大写、小写、特殊、数字和非数字字符,并提供了多种编程语言的实现示例。 ... [详细]
  • 本文深入探讨了HTTP请求和响应对象的使用,详细介绍了如何通过响应对象向客户端发送数据、处理中文乱码问题以及常见的HTTP状态码。此外,还涵盖了文件下载、请求重定向、请求转发等高级功能。 ... [详细]
  • PHP 过滤器详解
    本文深入探讨了 PHP 中的过滤器机制,包括常见的 $_SERVER 变量、filter_has_var() 函数、filter_id() 函数、filter_input() 函数及其数组形式、filter_list() 函数以及 filter_var() 和其数组形式。同时,详细介绍了各种过滤器的用途和用法。 ... [详细]
  • 本文详细介绍了 org.apache.commons.io.IOCase 类中的 checkCompareTo() 方法,通过多个代码示例展示其在不同场景下的使用方法。 ... [详细]
  • 本文详细介绍了如何在 Objective-C 中使用 @public 和 @protected 修饰符来控制类成员的访问权限。同时,探讨了点语法和箭头操作符的区别,以及属性声明和实现的自动生成。 ... [详细]
  • 深入解析Redis内存对象模型
    本文详细介绍了Redis内存对象模型的关键知识点,包括内存统计、内存分配、数据存储细节及优化策略。通过实际案例和专业分析,帮助读者全面理解Redis内存管理机制。 ... [详细]
  • 对象自省自省在计算机编程领域里,是指在运行时判断一个对象的类型和能力。dir能够返回一个列表,列举了一个对象所拥有的属性和方法。my_list[ ... [详细]
  • JavaScript 基础语法指南
    本文详细介绍了 JavaScript 的基础语法,包括变量、数据类型、运算符、语句和函数等内容,旨在为初学者提供全面的入门指导。 ... [详细]
  • 在本教程中,我们将深入探讨如何使用 Python 构建游戏的主程序模块。通过逐步实现各个关键组件,最终完成一个功能完善的游戏界面。 ... [详细]
  • 2018-2019学年第六周《Java数据结构与算法》学习总结
    本文总结了2018-2019学年第六周在《Java数据结构与算法》课程中的学习内容,重点介绍了非线性数据结构——树的相关知识及其应用。 ... [详细]
  • 本文介绍如何使用 Angular 6 的 HttpClient 模块来获取 HTTP 响应头,包括代码示例和常见问题的解决方案。 ... [详细]
  • 本文探讨了如何通过预处理器开关选择不同的类实现,并解决在特定情况下遇到的链接器错误。 ... [详细]
author-avatar
晓亮居士_264
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有