热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Python并发编程Futures

目录1.并行和并发2.并发编程之Futures2.1单线程与多线程性能比较2.2到底什么是Futures?2.3为什么多线程每次只能有一个线程执行ÿ

目录

1.并行和并发

2.并发编程之 Futures

2.1 单线程与多线程性能比较

2.2 到底什么是 Futures ?

2.3 为什么多线程每次只能有一个线程执行?

2.4 在Futures,如何判断任务是否完成以及获取结果?



无论对于哪门语言,并发编程都是一项很常用很重要的技巧。正确合理地使用并发编程,无疑会给程序带来极大的性能提升。


1.并行和并发

并发(Concurrency)和并行(Parallelism)这两个术语经常一起使用,导致很多人以为它们是一个意思,其实不然。

在 Python 中,并发并不是指同一时刻有多个操作(thread、task)同时进行。相反,某个特定的时刻,它只允许有一个操作发生,只不过线程 / 任务之间会互相切换,直到完成。

我们来看下面这张图,图中出现了 thread 和 task 两种切换顺序的不同方式,分别对应 Python 中并发的两种形式——threading 和 asyncio。 

对于 threading,操作系统知道每个线程的所有信息,因此它会做主在适当的时候做线程切换。很显然,这样的好处是代码容易书写,因为程序员不需要做任何切换操作的处理;但是切换线程的操作,也有可能出现在一个语句执行的过程中(比如 x += 1),这样就容易出现 race condition 的情况。

而对于 asyncio,主程序想要切换任务时,必须得到此任务可以被切换的通知,这样一来也就可以避免刚刚提到的 race condition 的情况。

至于所谓的并行,指的才是同一时刻、同时发生。Python 中的 multi-processing 便是这个意思,对于 multi-processing,你可以简单地这么理解:比如你的电脑是 6 核处理器,那么在运行程序时,就可以强制 Python 开 6 个进程,同时执行,以加快运行速度,它的原理示意图如下:

简单对比并发和并行:


  • 并发通常应用于 I/O 操作频繁的场景,比如你要从网站上下载多个文件,I/O 操作的时间可能会比 CPU 运行处理的时间长得多。
  • 并行则更多应用于 CPU heavy 的场景,比如 MapReduce 中的并行计算,为了加快运行速度,一般会用多台机器、多个处理器来完成。

2.并发编程之 Futures


2.1 单线程与多线程性能比较

首先通过一个具体的实例,从代码的角度来理解并发编程中的 Futures,并进一步来比较其与单线程的性能区别。假设我们有一个任务,是下载一些网站的内容并打印,如果用单线程的方式,它的基本代码实现如下:

流程比较简单:


  • 先是遍历存储网站的列表;
  • 然后对当前网站执行下载操作;
  • 等到当前操作完成后,再对下一个网站进行同样的操作,一直到结束。

import requests
import timedef download_one(url):resp = requests.get(url)print('Read {} from {}'.format(len(resp.content), url))def download_all(sites):for site in sites:download_one(site)def main():sites = ['https://www.baidu.com/s?wd=Portal:Arts','https://www.baidu.com/s?wd=Portal:History','https://www.baidu.com/s?wd=Portal:Society','https://www.baidu.com/s?wd=Portal:Biography','https://www.baidu.com/s?wd=Portal:Mathematics','https://www.baidu.com/s?wd=Portal:Technology','https://www.baidu.com/s?wd=Portal:Geography','https://www.baidu.com/s?wd=Portal:Science','https://www.baidu.com/s?wd=Computer_science','https://www.baidu.com/s?wd=Python_(programming_language)','https://www.baidu.com/s?wd=Java_(programming_language)','https://www.baidu.com/s?wd=PHP','https://www.baidu.com/s?wd=Node.js','https://www.baidu.com/s?wd=The_C_Programming_Language','https://www.baidu.com/s?wd=Go_(programming_language)']start_time = time.perf_counter()download_all(sites)end_time = time.perf_counter()print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))if __name__ == '__main__':main()

运行结果:

Read 227 from https://www.baidu.com/s?wd=Portal:Arts
Read 227 from https://www.baidu.com/s?wd=Portal:History
Read 227 from https://www.baidu.com/s?wd=Portal:Society
Read 227 from https://www.baidu.com/s?wd=Portal:Biography
Read 227 from https://www.baidu.com/s?wd=Portal:Mathematics
Read 227 from https://www.baidu.com/s?wd=Portal:Technology
Read 227 from https://www.baidu.com/s?wd=Portal:Geography
Read 227 from https://www.baidu.com/s?wd=Portal:Science
Read 227 from https://www.baidu.com/s?wd=Computer_science
Read 227 from https://www.baidu.com/s?wd=Python_(programming_language)
Read 227 from https://www.baidu.com/s?wd=Java_(programming_language)
Read 227 from https://www.baidu.com/s?wd=PHP
Read 227 from https://www.baidu.com/s?wd=Node.js
Read 227 from https://www.baidu.com/s?wd=The_C_Programming_Language
Read 227 from https://www.baidu.com/s?wd=Go_(programming_language)
Download 15 sites in 1.053478813 seconds

我们可以看到总共耗时约 1s+。单线程的优点是简单明了,但是明显效率低下,因为上述程序的绝大多数时间,都浪费在了 I/O 等待上。程序每次对一个网站执行下载操作,都必须等到前一个网站下载完成后才能开始。如果放在实际生产环境中,我们需要下载的网站数量至少是以万为单位的,不难想象,这种方案根本行不通。

接着我们再来看看多线程版本的代码实现:

import concurrent.futures
import requests
import threading
import timedef download_one(url):resp = requests.get(url)print('Read {} from {}'.format(len(resp.content), url))def download_all(sites):with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:executor.map(download_one, sites)def main():sites = ['https://www.baidu.com/s?wd=Portal:Arts','https://www.baidu.com/s?wd=Portal:History','https://www.baidu.com/s?wd=Portal:Society','https://www.baidu.com/s?wd=Portal:Biography','https://www.baidu.com/s?wd=Portal:Mathematics','https://www.baidu.com/s?wd=Portal:Technology','https://www.baidu.com/s?wd=Portal:Geography','https://www.baidu.com/s?wd=Portal:Science','https://www.baidu.com/s?wd=Computer_science','https://www.baidu.com/s?wd=Python_(programming_language)','https://www.baidu.com/s?wd=Java_(programming_language)','https://www.baidu.com/s?wd=PHP','https://www.baidu.com/s?wd=Node.js','https://www.baidu.com/s?wd=The_C_Programming_Language','https://www.baidu.com/s?wd=Go_(programming_language)']start_time = time.perf_counter()download_all(sites)end_time = time.perf_counter()print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))if __name__ == '__main__':main()

运行结果:

Read 227 from https://www.baidu.com/s?wd=Portal:History
Read 227 from https://www.baidu.com/s?wd=Portal:Mathematics
Read 227 from https://www.baidu.com/s?wd=Portal:Society
Read 227 from https://www.baidu.com/s?wd=Portal:Arts
Read 227 from https://www.baidu.com/s?wd=Portal:Biography
Read 227 from https://www.baidu.com/s?wd=Computer_science
Read 227 from https://www.baidu.com/s?wd=Python_(programming_language)
Read 227 from https://www.baidu.com/s?wd=Portal:Science
Read 227 from https://www.baidu.com/s?wd=Portal:Geography
Read 227 from https://www.baidu.com/s?wd=Portal:Technology
Read 227 from https://www.baidu.com/s?wd=Java_(programming_language)
Read 227 from https://www.baidu.com/s?wd=PHP
Read 227 from https://www.baidu.com/s?wd=Node.js
Read 227 from https://www.baidu.com/s?wd=The_C_Programming_Language
Read 227 from https://www.baidu.com/s?wd=Go_(programming_language)
Download 15 sites in 0.25518121699999996 seconds

非常明显,总耗时是 0.25s 左右,效率一下子提升了 4 倍多。具体来看这段代码,它是多线程版本和单线程版的主要区别所在:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:executor.map(download_one, sites)

这里我们创建了一个线程池,总共有 5 个线程可以分配使用。executor.map() 与前面所讲的 Python 内置的 map() 函数类似,表示对 sites 中的每一个元素,并发地调用函数 download_one()。

顺便提一下,在 download_one() 函数中,我们使用的 requests.get() 方法是线程安全的(thread-safe),因此在多线程的环境下,它也可以安全使用,并不会出现 race condition 的情况。

另外,虽然线程的数量可以自己定义,但是线程数并不是越多越好,因为线程的创建、维护和删除也会有一定的开销。所以如果你设置的很大,反而可能会导致速度变慢。我们往往需要根据实际的需求做一些测试,来寻找最优的线程数量

当然,我们也可以用并行的方式去提高程序运行效率。你只需要在 download_all() 函数中,做出下面的变化即可:

with futures.ThreadPoolExecutor(workers) as executor
=>
with futures.ProcessPoolExecutor() as executor:

在需要修改的这部分代码中,函数 ProcessPoolExecutor() 表示创建进程池,使用多个进程并行的执行程序。不过,这里我们通常省略参数 workers,因为系统会自动返回 CPU 的数量作为可以调用的进程数。

并行的方式一般用在 CPU heavy 的场景中,因为对于 I/O heavy 的操作,多数时间都会用于等待,相比于多线程,使用多进程并不会提升效率。反而很多时候,因为 CPU 数量的限制,会导致其执行效率不如多线程版本。


2.2 到底什么是 Futures ?

Python 中的 Futures 模块,位于 concurrent.futures 和 asyncio 中,它们都表示带有延迟的操作。Futures 会将处于等待状态的操作包裹起来放到队列中,这些操作的状态随时可以查询,当然,它们的结果或是异常,也能够在操作完成后被获取。

通常来说,作为用户,我们不用考虑如何去创建 Futures,这些 Futures 底层都会帮我们处理好。我们要做的,实际上是去 schedule 这些 Futures 的执行

比如,Futures 中的 Executor 类,当我们执行 executor.submit(func) 时,它便会安排里面的 func() 函数执行,并返回创建好的 future 实例,以便你之后查询调用。

这里再介绍一些常用的函数。Futures 中的方法 done(),表示相对应的操作是否完成——True 表示完成,False 表示没有完成。不过,要注意,done() 是 non-blocking 的,会立即返回结果。相对应的 add_done_callback(fn),则表示 Futures 完成后,相对应的参数函数 fn,会被通知并执行调用。

Futures 中还有一个重要的函数 result(),它表示当 future 完成后,返回其对应的结果或异常。而 as_completed(fs),则是针对给定的 future 迭代器 fs,在其完成后,返回完成后的迭代器。

所以,上述例子也可以写成下面的形式:


import concurrent.futures
import requests
import timedef download_one(url):resp = requests.get(url)print('Read {} from {}'.format(len(resp.content), url))def download_all(sites):with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:to_do = []for site in sites:future = executor.submit(download_one, site)to_do.append(future)for future in concurrent.futures.as_completed(to_do):future.result()
def main():sites = ['https://www.baidu.com/s?wd=Portal:Arts','https://www.baidu.com/s?wd=Portal:History','https://www.baidu.com/s?wd=Portal:Society','https://www.baidu.com/s?wd=Portal:Biography','https://www.baidu.com/s?wd=Portal:Mathematics','https://www.baidu.com/s?wd=Portal:Technology','https://www.baidu.com/s?wd=Portal:Geography','https://www.baidu.com/s?wd=Portal:Science','https://www.baidu.com/s?wd=Computer_science','https://www.baidu.com/s?wd=Python_(programming_language)','https://www.baidu.com/s?wd=Java_(programming_language)','https://www.baidu.com/s?wd=PHP','https://www.baidu.com/s?wd=Node.js','https://www.baidu.com/s?wd=The_C_Programming_Language','https://www.baidu.com/s?wd=Go_(programming_language)']start_time = time.perf_counter()download_all(sites)end_time = time.perf_counter()print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))if __name__ == '__main__':main()

运行结果:

Read 227 from https://www.baidu.com/s?wd=Portal:History
Read 227 from https://www.baidu.com/s?wd=Portal:Arts
Read 227 from https://www.baidu.com/s?wd=Portal:Society
Read 227 from https://www.baidu.com/s?wd=Portal:Biography
Read 227 from https://www.baidu.com/s?wd=Portal:Mathematics
Read 227 from https://www.baidu.com/s?wd=Python_(programming_language)
Read 227 from https://www.baidu.com/s?wd=Portal:Geography
Read 227 from https://www.baidu.com/s?wd=Portal:Technology
Read 227 from https://www.baidu.com/s?wd=Portal:Science
Read 227 from https://www.baidu.com/s?wd=Computer_science
Read 227 from https://www.baidu.com/s?wd=Go_(programming_language)
Read 227 from https://www.baidu.com/s?wd=PHP
Read 227 from https://www.baidu.com/s?wd=Node.js
Read 227 from https://www.baidu.com/s?wd=The_C_Programming_Language
Read 227 from https://www.baidu.com/s?wd=Java_(programming_language)
Download 15 sites in 0.6074131429999999 seconds

我们首先调用 executor.submit(),将下载每一个网站的内容都放进 future 队列 to_do,等待执行。然后是 as_completed() 函数,在 future 完成后,便输出结果。不过,这里要注意,future 列表中每个 future 完成的顺序,和它在列表中的顺序并不一定完全一致。到底哪个先完成、哪个后完成,取决于系统的调度和每个 future 的执行时间。


2.3 为什么多线程每次只能有一个线程执行?

同一时刻,Python 主程序只允许有一个线程执行,所以 Python 的并发,是通过多线程的切换完成的。你可能会疑惑这到底是为什么呢?

事实上,Python 的解释器并不是线程安全的,为了解决由此带来的 race condition 等问题,Python 便引入了全局解释器锁,也就是同一时刻,只允许一个线程执行。当然,在执行 I/O 操作时,如果一个线程被 block 了,全局解释器锁便会被释放,从而让另一个线程能够继续执行。



2.4 在Futures,如何判断任务是否完成以及获取结果?

这里补充2个demo,推荐使用第二种,因为第一种方式是显示等待后拿到结果,如果不知道任务运行多久,这样会比较笨(额,我刚开始就是这样弄的),第二种方法就要优雅很多了,通过future.done()判断线程执行状态是否结束和future.result()拿到函数的返回结果,直接看代码:

方法1:

import time
from logzero import logger
from concurrent.futures import ThreadPoolExecutor # 线程池模块# 全局变量,线程池临时结果存储
_thread_pool_executor_result = []def demo1():logger.info("start demo1 ...")time.sleep(5)logger.info("end demo1 ...")return ["demo1 result"]def callback(data):"""# 线程池运行回调函数:param data::return:"""# 修改全局变量,需要使用关键字globalglobal _thread_pool_executor_result_thread_pool_executor_result = data.result()def main():pool = ThreadPoolExecutor(1)logger.info("debug 1")# 执行完线程后,跟一个函数回调函数pool.submit(demo1, ).add_done_callback(callback)logger.info("debug 2")logger.info(_thread_pool_executor_result)logger.info("debug 3")time.sleep(6)logger.info(_thread_pool_executor_result)if __name__ == '__main__':# 主函数main()

运行结果:

[I 211115 09:16:26 demo222:30] debug 1
[I 211115 09:16:26 demo222:10] start demo1 ...
[I 211115 09:16:26 demo222:33] debug 2
[I 211115 09:16:26 demo222:34] []
[I 211115 09:16:26 demo222:35] debug 3
[I 211115 09:16:31 demo222:12] end demo1 ...
[I 211115 09:16:32 demo222:37] ['demo1 result']

 方法2:

import time
from logzero import logger
from concurrent.futures import ThreadPoolExecutor # 线程池模块# 全局变量,线程池临时结果存储
_thread_pool_executor_result = []def demo1():logger.info("start demo1 ...")time.sleep(5)logger.info("end demo1 ...")return ["demo1 result"]def callback(data):"""# 线程池运行回调函数:param data::return:"""# 修改全局变量,需要使用关键字globalglobal _thread_pool_executor_result_thread_pool_executor_result = data.result()def main():pool = ThreadPoolExecutor(1)logger.info("debug 1")# Futures 中的方法 done(),表示相对应的操作是否完成——True表示完成,False 表示没有完成future = pool.submit(demo1,)logger.info("debug 2")# Ps:这里可以加个超时等待机制,不然可能死循环while True:if future.done():logger.info("future job done.")logger.info(future.result())breakelse:logger.info("future job not done, will sleep 1s.")time.sleep(1)if __name__ == '__main__':# 主函数main()

运行结果:

[I 211115 09:15:15 demo222:30] debug 1
[I 211115 09:15:15 demo222:10] start demo1 ...
[I 211115 09:15:15 demo222:41] debug 2
[I 211115 09:15:15 demo222:49] future job not done, will sleep 1s.
[I 211115 09:15:16 demo222:49] future job not done, will sleep 1s.
[I 211115 09:15:17 demo222:49] future job not done, will sleep 1s.
[I 211115 09:15:18 demo222:49] future job not done, will sleep 1s.
[I 211115 09:15:19 demo222:49] future job not done, will sleep 1s.
[I 211115 09:15:20 demo222:12] end demo1 ...
[I 211115 09:15:20 demo222:45] future job done.
[I 211115 09:15:20 demo222:46] ['demo1 result']


推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • 本文讨论了在Windows 8上安装gvim中插件时出现的错误加载问题。作者将EasyMotion插件放在了正确的位置,但加载时却出现了错误。作者提供了下载链接和之前放置插件的位置,并列出了出现的错误信息。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了九度OnlineJudge中的1002题目“Grading”的解决方法。该题目要求设计一个公平的评分过程,将每个考题分配给3个独立的专家,如果他们的评分不一致,则需要请一位裁判做出最终决定。文章详细描述了评分规则,并给出了解决该问题的程序。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Commit1ced2a7433ea8937a1b260ea65d708f32ca7c95eintroduceda+Clonetraitboundtom ... [详细]
author-avatar
手机用户2602901285
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有