热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[Python]future线程处理并发的核心源码详解

ThreadPoolExecutor核心工作详解ThreadPoolExecutor内部是如何工作的?什么是future对象?线程什么时候被创建&#x

ThreadPoolExecutor 核心工作详解


  1. ThreadPoolExecutor内部是如何工作的?
  2. 什么是future对象?
  3. 线程什么时候被创建?
  4. 任务是如何被线程处理的?
  5. 结束之后如何销毁的

1. ThreadPoolExecutor

我们所有的工作都是在ThreadPoolExecutor完成的:

with futures.ThreadPoolExecutor(max_workers=2) as executor:...

executor中定义了几个关键的变量。

class ThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix='', ...):self._max_workers = max_workers # 最大线程数self._work_queue = queue.SimpleQueue() # 任务池Queue,所有线程从该Queue中取任务然后执行self._threads = set() # 线程池

2. executor.submit() 的核心

future对象是我们通过调用submit()时返回的。

future = executor.submit(sleep_and_print, _i) # submit

看看submit() 做了哪些事情:

# def submit(*args, **kwargs)对参数进行了处理,实际上相当于这样:
def submit(self, fn, *args, **kwargs): ...# 下面是核心代码f = _base.Future() # 创建一个Future()对象w = _WorkItem(f, fn, args, kwargs) # 创建一个_WorkItem,传入可调用的fn和fn的所有参数self._work_queue.put(w) # 将_WorkItem加入_work_queueself._adjust_thread_count() # 调整线程数量,线程就是在这个函数中被创建的return f # 返回future

3. Future 对象

future对象初始化时,定义了几个实例变量。

class Future(object):"""代表一个异步计算的结果"""def __init__(self):"""Initializes the future. Should not be called by clients."""self._condition = threading.Condition()self._state = PENDING # 状态。self._result = None # 结果。处理调用方传入的fn的结果,_result = fn(*args, **kwargs)self._exception = None # 异常值。处理fn时,若fn抛出异常,则设置改异常值。self._done_callbacks = [] # 回调函数数组。用于处理add_done_callback传入的可调用对象。

future有五种状态:

_FUTURE_STATES = [PENDING,RUNNING,CANCELLED,CANCELLED_AND_NOTIFIED,FINISHED
]

4. WorkItem 对象

WorkItem和future是一对一的,submit()会为每一个future都创建一个WorkItem。WorkItem记录待处理的任务fn及其参数,线程会从work_queue中取出WorkItem,调用它的run()方法。WorkItem只定义了一个run()方法:

class _WorkItem(object):def __init__(self, future, fn, args, kwargs):self.future = future # futureself.fn = fn # fnself.args = args # fn的参数self.kwargs = kwargs # fn的参数def run(self):if not self.future.set_running_or_notify_cancel():returntry:result = self.fn(*self.args, **self.kwargs) # 调用fnexcept BaseException as exc:self.future.set_exception(exc) # 如果fn有报错,设置错误类型给future# Break a reference cycle with the exception 'exc'self = Noneelse:self.future.set_result(result) # 将fn的返回值,设置给future

5. executor._adjust_thread_count 函数

def _adjust_thread_count(self):def weakref_cb(_, q&#61;self._work_queue): # 这个函数貌似没什么用&#xff0c;后文分析...num_threads &#61; len(self._threads) # _threads是已经创建的线程集合(set)if num_threads < self._max_workers: # 如果已创建的线程数小于最大线程数thread_name &#61; &#39;%s_%d&#39; % (self._thread_name_prefix or self,num_threads) # 线程名&#61;线程名前缀&#43;线程序号t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,args&#61;(weakref.ref(self, weakref_cb),self._work_queue,...)) # 创建线程&#xff0c;主要的两个参数是weakref.ref(self, weakref_cb), self._work_queue。前者是当前executor的弱引用&#xff0c;后者是executor的任务池t.daemon &#61; True # 守护线程。主线程退出时&#xff0c;守护线程会自动退出t.start() # 启动线程self._threads.add(t) # 加入已创建的线程集合_threads_queues[t] &#61; self._work_queue # _threads_queues是一个全局的弱引用WeakKeyDictionary&#xff0c;记录所有executor的任务池&#xff0c;以便退出时进行清理操作

t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,)

这个 target 指向的 _worker 函数很关键&#xff0c;它会执行一个 while True 循环&#xff0c;从 work_queue 中不断获取 work_item&#xff0c; 然后执行 work_item 的 run() 方法&#xff1a;

def _worker(..., work_queue, ...):try:while True:work_item &#61; work_queue.get(block&#61;True) # 从 work_queue 中取一个 _WorkItemif work_item is not None:work_item.run() # 调用 _WorkItem.run()# del 删除对象的引用&#xff0c;会将 x 的引用计数减一&#xff0c;当引用计数为0时&#xff0c;调用 x.__del__() 销毁实例。del work_itemcontinue...except BaseException:_base.LOGGER.critical(&#39;Exception in worker&#39;, exc_info&#61;True)

这里没有贴出线程退出 While True 的循环&#xff0c;是不想代码过于复杂了。如果感兴趣&#xff0c;可以自己查看源码。

如果你对 weakref.ref(self, weakref_cb) 不理解&#xff0c;下面我讲关键代码提炼出来&#xff0c;模拟该弱引用&#xff1a;

不知道什么是弱引用&#xff0c;请先参考文章&#xff1a;[Python] 高级用法 - 弱引用详解

import weakrefdef _worker(executor_reference):print(&#39;Enter function: _worker&#39;)print(executor_reference)executor &#61; executor_reference()print(executor)if executor is not None:executor.say_hello()def run():a &#61; A(&#39;DemoA&#39;)def weakref_cb(_):print(&#39;Enter function: weakref_cb&#39;)print((weakref.ref(a, weakref_cb)))print(weakref.ref(a))_worker((weakref.ref(a, weakref_cb))) # 两行代码效果相同_worker(weakref.ref(a)) # 两行代码效果相同class A:def __init__(self, name):self.name &#61; namedef say_hello(self):print(f"Hello, I am {self.name}.")if __name__ &#61;&#61; &#39;__main__&#39;:run()

weakref_cb 并没有被执行&#xff1a;

<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.

6. executor 如何退出

使用 with 关键字&#xff0c;在结束时会调用__exit__方法&#xff0c;这个方法调用了 shutdown() 方法。 shutdown() 会做两件事&#xff1a;第一&#xff0c;给 work_queue 传入一个 None 值&#xff0c;线程执行时&#xff0c;获取到 None 就会退出 While True 循环&#xff1b; 第二&#xff0c;如果 wait 为 True&#xff0c;将剩余所有未执行完的线程&#xff0c;调用它们的 join() 方法&#xff0c;让主线程等待它们完成&#xff1a;

class Executor(object):...def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.shutdown(wait&#61;True)return Falseclass ThreadPoolExecutor(_base.Executor):...def shutdown(self, wait&#61;True):with self._shutdown_lock:self._shutdown &#61; Trueself._work_queue.put(None)if wait:for t in self._threads:t.join()

推荐阅读
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 掌握远程执行Linux脚本和命令的技巧
    本文将详细介绍如何利用Python的Paramiko库实现远程执行Linux脚本和命令,帮助读者快速掌握这一实用技能。通过具体的示例和详尽的解释,让初学者也能轻松上手。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 本文提供了使用Java实现Bellman-Ford算法解决POJ 3259问题的代码示例,详细解释了如何通过该算法检测负权环来判断时间旅行的可能性。 ... [详细]
  • 本文探讨了在Java多线程环境下,如何确保具有相同key值的线程能够互斥执行并按顺序输出结果。通过优化代码结构和使用线程安全的数据结构,我们解决了线程同步问题,并实现了预期的并发行为。 ... [详细]
  • 深入理解Tornado模板系统
    本文详细介绍了Tornado框架中模板系统的使用方法。Tornado自带的轻量级、高效且灵活的模板语言位于tornado.template模块,支持嵌入Python代码片段,帮助开发者快速构建动态网页。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 本文详细介绍了 Apache Jena 库中的 Txn.executeWrite 方法,通过多个实际代码示例展示了其在不同场景下的应用,帮助开发者更好地理解和使用该方法。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 根据最新发布的《互联网人才趋势报告》,尽管大量IT从业者已转向Python开发,但随着人工智能和大数据领域的迅猛发展,仍存在巨大的人才缺口。本文将详细介绍如何使用Python编写一个简单的爬虫程序,并提供完整的代码示例。 ... [详细]
  • Scala 实现 UTF-8 编码属性文件读取与克隆
    本文介绍如何使用 Scala 以 UTF-8 编码方式读取属性文件,并实现属性文件的克隆功能。通过这种方式,可以确保配置文件在多线程环境下的一致性和高效性。 ... [详细]
  • 本文详细介绍了Python中文件的基本操作,包括打开、读取、写入和关闭文件的方法,并通过实例展示了如何将Excel文件转换为CSV文件以及进一步转换为HTML文件。此外,还涉及了成绩等级替换的具体实现。 ... [详细]
author-avatar
晓亮居士_264
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有