热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[Python]future线程处理并发的核心源码详解

ThreadPoolExecutor核心工作详解ThreadPoolExecutor内部是如何工作的?什么是future对象?线程什么时候被创建&#x

ThreadPoolExecutor 核心工作详解


  1. ThreadPoolExecutor内部是如何工作的?
  2. 什么是future对象?
  3. 线程什么时候被创建?
  4. 任务是如何被线程处理的?
  5. 结束之后如何销毁的

1. ThreadPoolExecutor

我们所有的工作都是在ThreadPoolExecutor完成的:

with futures.ThreadPoolExecutor(max_workers=2) as executor:...

executor中定义了几个关键的变量。

class ThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix='', ...):self._max_workers = max_workers # 最大线程数self._work_queue = queue.SimpleQueue() # 任务池Queue,所有线程从该Queue中取任务然后执行self._threads = set() # 线程池

2. executor.submit() 的核心

future对象是我们通过调用submit()时返回的。

future = executor.submit(sleep_and_print, _i) # submit

看看submit() 做了哪些事情:

# def submit(*args, **kwargs)对参数进行了处理,实际上相当于这样:
def submit(self, fn, *args, **kwargs): ...# 下面是核心代码f = _base.Future() # 创建一个Future()对象w = _WorkItem(f, fn, args, kwargs) # 创建一个_WorkItem,传入可调用的fn和fn的所有参数self._work_queue.put(w) # 将_WorkItem加入_work_queueself._adjust_thread_count() # 调整线程数量,线程就是在这个函数中被创建的return f # 返回future

3. Future 对象

future对象初始化时,定义了几个实例变量。

class Future(object):"""代表一个异步计算的结果"""def __init__(self):"""Initializes the future. Should not be called by clients."""self._condition = threading.Condition()self._state = PENDING # 状态。self._result = None # 结果。处理调用方传入的fn的结果,_result = fn(*args, **kwargs)self._exception = None # 异常值。处理fn时,若fn抛出异常,则设置改异常值。self._done_callbacks = [] # 回调函数数组。用于处理add_done_callback传入的可调用对象。

future有五种状态:

_FUTURE_STATES = [PENDING,RUNNING,CANCELLED,CANCELLED_AND_NOTIFIED,FINISHED
]

4. WorkItem 对象

WorkItem和future是一对一的,submit()会为每一个future都创建一个WorkItem。WorkItem记录待处理的任务fn及其参数,线程会从work_queue中取出WorkItem,调用它的run()方法。WorkItem只定义了一个run()方法:

class _WorkItem(object):def __init__(self, future, fn, args, kwargs):self.future = future # futureself.fn = fn # fnself.args = args # fn的参数self.kwargs = kwargs # fn的参数def run(self):if not self.future.set_running_or_notify_cancel():returntry:result = self.fn(*self.args, **self.kwargs) # 调用fnexcept BaseException as exc:self.future.set_exception(exc) # 如果fn有报错,设置错误类型给future# Break a reference cycle with the exception 'exc'self = Noneelse:self.future.set_result(result) # 将fn的返回值,设置给future

5. executor._adjust_thread_count 函数

def _adjust_thread_count(self):def weakref_cb(_, q&#61;self._work_queue): # 这个函数貌似没什么用&#xff0c;后文分析...num_threads &#61; len(self._threads) # _threads是已经创建的线程集合(set)if num_threads < self._max_workers: # 如果已创建的线程数小于最大线程数thread_name &#61; &#39;%s_%d&#39; % (self._thread_name_prefix or self,num_threads) # 线程名&#61;线程名前缀&#43;线程序号t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,args&#61;(weakref.ref(self, weakref_cb),self._work_queue,...)) # 创建线程&#xff0c;主要的两个参数是weakref.ref(self, weakref_cb), self._work_queue。前者是当前executor的弱引用&#xff0c;后者是executor的任务池t.daemon &#61; True # 守护线程。主线程退出时&#xff0c;守护线程会自动退出t.start() # 启动线程self._threads.add(t) # 加入已创建的线程集合_threads_queues[t] &#61; self._work_queue # _threads_queues是一个全局的弱引用WeakKeyDictionary&#xff0c;记录所有executor的任务池&#xff0c;以便退出时进行清理操作

t &#61; threading.Thread(name&#61;thread_name, target&#61;_worker,)

这个 target 指向的 _worker 函数很关键&#xff0c;它会执行一个 while True 循环&#xff0c;从 work_queue 中不断获取 work_item&#xff0c; 然后执行 work_item 的 run() 方法&#xff1a;

def _worker(..., work_queue, ...):try:while True:work_item &#61; work_queue.get(block&#61;True) # 从 work_queue 中取一个 _WorkItemif work_item is not None:work_item.run() # 调用 _WorkItem.run()# del 删除对象的引用&#xff0c;会将 x 的引用计数减一&#xff0c;当引用计数为0时&#xff0c;调用 x.__del__() 销毁实例。del work_itemcontinue...except BaseException:_base.LOGGER.critical(&#39;Exception in worker&#39;, exc_info&#61;True)

这里没有贴出线程退出 While True 的循环&#xff0c;是不想代码过于复杂了。如果感兴趣&#xff0c;可以自己查看源码。

如果你对 weakref.ref(self, weakref_cb) 不理解&#xff0c;下面我讲关键代码提炼出来&#xff0c;模拟该弱引用&#xff1a;

不知道什么是弱引用&#xff0c;请先参考文章&#xff1a;[Python] 高级用法 - 弱引用详解

import weakrefdef _worker(executor_reference):print(&#39;Enter function: _worker&#39;)print(executor_reference)executor &#61; executor_reference()print(executor)if executor is not None:executor.say_hello()def run():a &#61; A(&#39;DemoA&#39;)def weakref_cb(_):print(&#39;Enter function: weakref_cb&#39;)print((weakref.ref(a, weakref_cb)))print(weakref.ref(a))_worker((weakref.ref(a, weakref_cb))) # 两行代码效果相同_worker(weakref.ref(a)) # 两行代码效果相同class A:def __init__(self, name):self.name &#61; namedef say_hello(self):print(f"Hello, I am {self.name}.")if __name__ &#61;&#61; &#39;__main__&#39;:run()

weakref_cb 并没有被执行&#xff1a;

<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.
Enter function: _worker
<weakref at 0x0000027068011958; to &#39;A&#39; at 0x000002707F073848>
<__main__.A object at 0x000002707F073848>
Hello, I am DemoA.

6. executor 如何退出

使用 with 关键字&#xff0c;在结束时会调用__exit__方法&#xff0c;这个方法调用了 shutdown() 方法。 shutdown() 会做两件事&#xff1a;第一&#xff0c;给 work_queue 传入一个 None 值&#xff0c;线程执行时&#xff0c;获取到 None 就会退出 While True 循环&#xff1b; 第二&#xff0c;如果 wait 为 True&#xff0c;将剩余所有未执行完的线程&#xff0c;调用它们的 join() 方法&#xff0c;让主线程等待它们完成&#xff1a;

class Executor(object):...def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.shutdown(wait&#61;True)return Falseclass ThreadPoolExecutor(_base.Executor):...def shutdown(self, wait&#61;True):with self._shutdown_lock:self._shutdown &#61; Trueself._work_queue.put(None)if wait:for t in self._threads:t.join()

推荐阅读
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • OO第一单元自白:简单多项式导函数的设计与bug分析
    本文介绍了作者在学习OO的第一次作业中所遇到的问题及其解决方案。作者通过建立Multinomial和Monomial两个类来实现多项式和单项式,并通过append方法将单项式组合为多项式,并在此过程中合并同类项。作者还介绍了单项式和多项式的求导方法,并解释了如何利用正则表达式提取各个单项式并进行求导。同时,作者还对自己在输入合法性判断上的不足进行了bug分析,指出了自己在处理指数情况时出现的问题,并总结了被hack的原因。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • 本文介绍了协程的概念和意义,以及使用greenlet、yield、asyncio、async/await等技术实现协程编程的方法。同时还介绍了事件循环的作用和使用方法,以及如何使用await关键字和Task对象来实现异步编程。最后还提供了一些快速上手的示例代码。 ... [详细]
  • 第七课主要内容:多进程多线程FIFO,LIFO,优先队列线程局部变量进程与线程的选择线程池异步IO概念及twisted案例股票数据抓取 ... [详细]
  • java线程池的实现原理源码分析
    这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
  • 1简介本文结合数字信号处理课程和Matlab程序设计课程的相关知识,给出了基于Matlab的音乐播放器的总体设计方案,介绍了播放器主要模块的功能,设计与实现方法.我们将该设 ... [详细]
author-avatar
晓亮居士_264
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有