热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[Python]future线程处理并发的核心源码详解

ThreadPoolExecutor核心工作详解ThreadPoolExecutor内部是如何工作的?什么是future对象?线程什么时候被创建&#x

ThreadPoolExecutor 核心工作详解


  1. ThreadPoolExecutor内部是如何工作的?
  2. 什么是future对象?
  3. 线程什么时候被创建?
  4. 任务是如何被线程处理的?
  5. 结束之后如何销毁的

1. ThreadPoolExecutor

我们所有的工作都是在ThreadPoolExecutor完成的:

with futures.ThreadPoolExecutor(max_workers=2) as executor:...

executor中定义了几个关键的变量。

class ThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix='', ...):self._max_workers = max_workers # 最大线程数self._work_queue = queue.SimpleQueue() # 任务池Queue,所有线程从该Queue中取任务然后执行self._threads = set() # 线程池

2. executor.submit() 的核心

future对象是我们通过调用submit()时返回的。

future = executor.submit(sleep_and_print, _i) # submit

看看submit() 做了哪些事情:

# def submit(*args, **kwargs)对参数进行了处理,实际上相当于这样:
def submit(self, fn, *args, **kwargs): ...# 下面是核心代码f = _base.Future() # 创建一个Future()对象w = _WorkItem(f, fn, args, kwargs) # 创建一个_WorkItem,传入可调用的fn和fn的所有参数self._work_queue.put(w) # 将_WorkItem加入_work_queueself._adjust_thread_count() # 调整线程数量,线程就是在这个函数中被创建的return f # 返回future

3. Future 对象

future对象初始化时,定义了几个实例变量。

class Future(object):"""代表一个异步计算的结果"""def __init__(self):"""Initializes the future. Should not be called by clients."""self._condition = threading.Condition()self._state = PENDING # 状态。self._result = None # 结果。处理调用方传入的fn的结果,_result = fn(*args, **kwargs)self._exception = None # 异常值。处理fn时,若fn抛出异常,则设置改异常值。self._done_callbacks = [] # 回调函数数组。用于处理add_done_callback传入的可调用对象。

future有五种状态:

_FUTURE_STATES = [PENDING,RUNNING,CANCELLED,CANCELLED_AND_NOTIFIED,FINISHED
]

4. WorkItem 对象

WorkItem和future是一对一的,submit()会为每一个future都创建一个WorkItem。WorkItem记录待处理的任务fn及其参数,线程会从work_queue中取出WorkItem,调用它的run()方法。WorkItem只定义了一个run()方法:

class _WorkItem(object):def __init__(self, future, fn, args, kwargs):self.future = future # futureself.fn = fn # fnself.args = args # fn的参数self.kwargs = kwargs # fn的参数def run(self):if not self.future.set_running_or_notify_cancel():returntry:result = self.fn(*self.args, **self.kwargs) # 调用fnexcept BaseException as exc:self.future.set_exception(exc) # 如果fn有报错,设置错误类型给future# Break a reference cycle with the exception 'exc'self = Noneelse:self.future.set_result(result) # 将fn的返回值,设置给future

5. executor._adjust_thread_count 函数

def _adjust_thread_count(self):def weakref_cb(_, q&#61;self._work_queue): # 这个函数貌似没什么用&#xff0c;后文分析...num_threads &#61; len(self._threads) # _threads是已经创建的线程集合(set)if num_threads < self._max_workers: # 如果已创建的线程数小于最大线程数thread_name &#61; &#39;%s_%d&#39; % (self._thread_name_prefix or self,num_threads) # 线程名&#61;线程名前缀&#43;线程序号t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,args&#61;(weakref.ref(self, weakref_cb),self._work_queue,...)) # 创建线程&#xff0c;主要的两个参数是weakref.ref(self, weakref_cb), self._work_queue。前者是当前executor的弱引用&#xff0c;后者是executor的任务池t.daemon &#61; True # 守护线程。主线程退出时&#xff0c;守护线程会自动退出t.start() # 启动线程self._threads.add(t) # 加入已创建的线程集合_threads_queues[t] &#61; self._work_queue # _threads_queues是一个全局的弱引用WeakKeyDictionary&#xff0c;记录所有executor的任务池&#xff0c;以便退出时进行清理操作

t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,)

这个 target 指向的 _worker 函数很关键&#xff0c;它会执行一个 while True 循环&#xff0c;从 work_queue 中不断获取 work_item&#xff0c; 然后执行 work_item 的 run() 方法&#xff1a;

def _worker(..., work_queue, ...):try:while True:work_item &#61; work_queue.get(block&#61;True) # 从 work_queue 中取一个 _WorkItemif work_item is not None:work_item.run() # 调用 _WorkItem.run()# del 删除对象的引用&#xff0c;会将 x 的引用计数减一&#xff0c;当引用计数为0时&#xff0c;调用 x.__del__() 销毁实例。del work_itemcontinue...except BaseException:_base.LOGGER.critical(&#39;Exception in worker&#39;, exc_info&#61;True)

这里没有贴出线程退出 While True 的循环&#xff0c;是不想代码过于复杂了。如果感兴趣&#xff0c;可以自己查看源码。

如果你对 weakref.ref(self, weakref_cb) 不理解&#xff0c;下面我讲关键代码提炼出来&#xff0c;模拟该弱引用&#xff1a;

不知道什么是弱引用&#xff0c;请先参考文章&#xff1a;[Python] 高级用法 - 弱引用详解

import weakrefdef _worker(executor_reference):print(&#39;Enter function: _worker&#39;)print(executor_reference)executor &#61; executor_reference()print(executor)if executor is not None:executor.say_hello()def run():a &#61; A(&#39;DemoA&#39;)def weakref_cb(_):print(&#39;Enter function: weakref_cb&#39;)print((weakref.ref(a, weakref_cb)))print(weakref.ref(a))_worker((weakref.ref(a, weakref_cb))) # 两行代码效果相同_worker(weakref.ref(a)) # 两行代码效果相同class A:def __init__(self, name):self.name &#61; namedef say_hello(self):print(f"Hello, I am {self.name}.")if __name__ &#61;&#61; &#39;__main__&#39;:run()

weakref_cb 并没有被执行&#xff1a;

<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.

6. executor 如何退出

使用 with 关键字&#xff0c;在结束时会调用__exit__方法&#xff0c;这个方法调用了 shutdown() 方法。 shutdown() 会做两件事&#xff1a;第一&#xff0c;给 work_queue 传入一个 None 值&#xff0c;线程执行时&#xff0c;获取到 None 就会退出 While True 循环&#xff1b; 第二&#xff0c;如果 wait 为 True&#xff0c;将剩余所有未执行完的线程&#xff0c;调用它们的 join() 方法&#xff0c;让主线程等待它们完成&#xff1a;

class Executor(object):...def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.shutdown(wait&#61;True)return Falseclass ThreadPoolExecutor(_base.Executor):...def shutdown(self, wait&#61;True):with self._shutdown_lock:self._shutdown &#61; Trueself._work_queue.put(None)if wait:for t in self._threads:t.join()

推荐阅读
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 本文详细解析了Python中的os和sys模块,介绍了它们的功能、常用方法及其在实际编程中的应用。 ... [详细]
  • 本文详细介绍如何使用Python进行配置文件的读写操作,涵盖常见的配置文件格式(如INI、JSON、TOML和YAML),并提供具体的代码示例。 ... [详细]
  • 技术分享:从动态网站提取站点密钥的解决方案
    本文探讨了如何从动态网站中提取站点密钥,特别是针对验证码(reCAPTCHA)的处理方法。通过结合Selenium和requests库,提供了详细的代码示例和优化建议。 ... [详细]
  • 掌握远程执行Linux脚本和命令的技巧
    本文将详细介绍如何利用Python的Paramiko库实现远程执行Linux脚本和命令,帮助读者快速掌握这一实用技能。通过具体的示例和详尽的解释,让初学者也能轻松上手。 ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • 本文详细介绍了如何解决Uploadify插件在Internet Explorer(IE)9和10版本中遇到的点击失效及JQuery运行时错误问题。通过修改相关JavaScript代码,确保上传功能在不同浏览器环境中的一致性和稳定性。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • 前言--页数多了以后需要指定到某一页(只做了功能,样式没有细调)html ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 本文探讨了如何在给定整数N的情况下,找到两个不同的整数a和b,使得它们的和最大,并且满足特定的数学条件。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 本文介绍了在Windows环境下使用pydoc工具的方法,并详细解释了如何通过命令行和浏览器查看Python内置函数的文档。此外,还提供了关于raw_input和open函数的具体用法和功能说明。 ... [详细]
  • 本文介绍如何使用Python进行文本处理,包括分词和生成词云图。通过整合多个文本文件、去除停用词并生成词云图,展示文本数据的可视化分析方法。 ... [详细]
author-avatar
晓亮居士_264
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有