热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

pythonweb并发性能_python并发与web

python并发与webpython并发主要方式有:Thread(线程)Process(进程)协程python因为GIL的存在使得python的并发无法利用

python并发与web

python并发主要方式有:

Thread(线程)

Process(进程)

协程

python因为GIL的存在使得python的并发无法利用CPU多核的优势以至于性能比较差,下面我们将通过几个例子来介绍python的并发。

线程

我们通过一个简单web server程序来观察python的线程,首先写一个耗时的小函数

def fib(n):

if n <&#61; 2:

return 1

else:

return fib(n - 1) &#43; fib(n - 2)

然后写一个fib web server&#xff0c;程序比较简单就不解释了。

from socket import *

from fib import fib

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

client, addr &#61; sock.accept()

print(&#39;Connection&#39;, addr)

fib_handle(client)

def fib_handler(client):

while True:

req &#61; client.recv(100)

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

client.send(resp)

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002))

运行shell命令可以看到计算结果

nc localhost 25002

10

55

由于服务段是单线程的&#xff0c;如果另外启动一个连接将得不到计算结果

nc localhost 25002

10

为了能让我们的server支持多个请求&#xff0c;我们对服务端代码加入多线程支持

#sever.py

#服务端代码

from socket import *

from fib import fib

from threading import Thread

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

client, addr &#61; sock.accept()

print(&#39;Connection&#39;, addr)

#fib_handler(client)

Thread(target&#61;fib_handler, args&#61;(client,), daemon&#61;True).start() #需要在python3下运行

def fib_handler(client):

while True:

req &#61; client.recv(100)

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

client.send(resp)

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002)) #在25002端口启动程序

运行shell命令可以看到计算结果

nc localhost 25002

10

55

由于服务端是多线程的&#xff0c;启动一个新连接将得到计算结果

nc localhost 25002

10

55

性能测试

我们加入一段性能测试代码

#perf1.py

from socket import *

from threading import Thread

import time

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.connect((&#39;localhost&#39;, 25002))

n &#61; 0

def monitor():

global n

while True:

time.sleep(1)

print(n, &#39;reqs/sec&#39;)

n &#61; 0

Thread(target&#61;monitor).start()

while True:

start &#61; time.time()

sock.send(b&#39;1&#39;)

resp &#61; sock.recv(100)

end &#61; time.time()

n &#43;&#61; 1

#代码非常简单&#xff0c;通过全局变量n来统计qps(req/sec 每秒请求数)

在shell中运行perf1.py可以看到结果如下:

106025 reqs/sec

109382 reqs/sec

98211 reqs/sec

105391 reqs/sec

108875 reqs/sec

平均每秒请求数大概是10w左右

如果我们另外启动一个进程来进行性能测试就会发现python的GIL对线程造成的影响

python3 perf1.py

74677 reqs/sec

78284 reqs/sec

72029 reqs/sec

81719 reqs/sec

82392 reqs/sec

84261 reqs/sec

并且原来的shell中的qps也是类似结果

96488 reqs/sec

99380 reqs/sec

84918 reqs/sec

87485 reqs/sec

85118 reqs/sec

78211 reqs/sec

如果我们再运行

nc localhost 25002

40

来完全占用服务器资源一段时间&#xff0c;就可以看到shell窗口内的rqs迅速下降到

99 reqs/sec

99 reqs/sec

这也反映了Python的GIL的一个特点&#xff0c;会优先处理占用CPU资源大的任务

具体原因我也不知道&#xff0c;可能需要阅读GIL实现源码才能知道。

线程池在web编程的应用

python有个库叫做cherrypy&#xff0c;最近用到&#xff0c;大致浏览了一下其源代码&#xff0c;其内核使用的是python线程池技术。

cherrypy通过Python线程安全的队列来维护线程池&#xff0c;具体实现为:

class ThreadPool(object):

"""A Request Queue for an HTTPServer which pools threads.

ThreadPool objects must provide min, get(), put(obj), start()

and stop(timeout) attributes.

"""

def __init__(self, server, min&#61;10, max&#61;-1,

accepted_queue_size&#61;-1, accepted_queue_timeout&#61;10):

self.server &#61; server

self.min &#61; min

self.max &#61; max

self._threads &#61; []

self._queue &#61; queue.Queue(maxsize&#61;accepted_queue_size)

self._queue_put_timeout &#61; accepted_queue_timeout

self.get &#61; self._queue.get

def start(self):

"""Start the pool of threads."""

for i in range(self.min):

self._threads.append(WorkerThread(self.server))

for worker in self._threads:

worker.setName(&#39;CP Server &#39; &#43; worker.getName())

worker.start()

for worker in self._threads:

while not worker.ready:

time.sleep(.1)

....

def put(self, obj):

self._queue.put(obj, block&#61;True, timeout&#61;self._queue_put_timeout)

if obj is _SHUTDOWNREQUEST:

return

def grow(self, amount):

"""Spawn new worker threads (not above self.max)."""

if self.max > 0:

budget &#61; max(self.max - len(self._threads), 0)

else:

# self.max <&#61; 0 indicates no maximum

budget &#61; float(&#39;inf&#39;)

n_new &#61; min(amount, budget)

workers &#61; [self._spawn_worker() for i in range(n_new)]

while not all(worker.ready for worker in workers):

time.sleep(.1)

self._threads.extend(workers)

....

def shrink(self, amount):

"""Kill off worker threads (not below self.min)."""

[...]

def stop(self, timeout&#61;5):

# Must shut down threads here so the code that calls

# this method can know when all threads are stopped.

[...]

可以看出来&#xff0c;cherrypy的线程池将大小初始化为10&#xff0c;每当有一个httpconnect进来时就将其放入任务队列中&#xff0c;然后WorkerThread会不断从任务队列中取出任务执行&#xff0c;可以看到这是一个非常标准的线程池模型。

进程

由于Python的thread无法利用多核&#xff0c;为了充分利用多核CPU&#xff0c;Python可以使用了多进程来模拟线程以提高并发的性能。Python的进程代价比较高可以看做是另外再启动一个python进程。

#server_pool.py

from socket import *

from fib import fib

from threading import Thread

from concurrent.futures import ProcessPoolExecutor as Pool #这里用的python3的线程池&#xff0c;对应python2的threadpool

pool &#61; Pool(4) #启动一个大小为4的进程池

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

client, addr &#61; sock.accept()

print(&#39;Connection&#39;, addr)

Thread(target&#61;fib_handler, args&#61;(client,), daemon&#61;True).start()

def fib_handler(client):

while True:

req &#61; client.recv(100)

if not req:

break

n &#61; int(req)

future &#61; pool.submit(fib, n)

result &#61; future.result()

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

client.send(resp)

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002))

性能测试

可以看到新的server的qps为&#xff1a;

4613 reqs/sec

4764 reqs/sec

4619 reqs/sec

4393 reqs/sec

4768 reqs/sec

4846 reqs/sec

这个结果远低于前面的10w qps主要原因是进程启动速度较慢&#xff0c;进程池内部逻辑比较复杂&#xff0c;涉及到了数据传输&#xff0c;队列等问题。

但是通过多进程我们可以保证每一个链接相对独立&#xff0c;不会受其他请求太大的影响。

即使我们使用以下耗时的命令也不会影响到性能测试

nc localhost 25502

40

协程

协程简介

协程是一个古老的概念&#xff0c;最早出现在早期的os中&#xff0c;它出现的时间甚至比线程进程还要早。

协程也是一个比较难以理解和运用的并发方式&#xff0c;用协程写出来的代码比较难以理解。

python中使用yield和next来实现协程的控制。

def count(n):

while(n > 0):

yield n #yield起到的作用是blocking&#xff0c;将代码阻塞在这里&#xff0c;生成一个generator&#xff0c;然后通过next调用。

n -&#61; 1

for i in count(5):

print(i)

#可以看到运行结果&#xff1a;

5

4

3

2

1

下面我们通过例子来介绍如何书写协程代码。首先回到之前的代码。首先我们要想到我们为什么要用线程&#xff0c;当然是为了防止阻塞&#xff0c;

这里的阻塞来自socket的IO和cpu占用2个方面。协程的引入也是为了防止阻塞&#xff0c;因此我们先将代码中的阻塞点标记出来。

#sever.py

#服务端代码

from socket import *

from fib import fib

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

client, addr &#61; sock.accept() #blocking

print(&#39;Connection&#39;, addr)

fib_handler(client)

def fib_handler(client):

while True:

req &#61; client.recv(100) #blocking

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

client.send(resp) #blocking

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002)) #在25002端口启动程序

上面标记了3个socket IO阻塞点&#xff0c;我们先忽略CPU占用。

首先我们在blocking点插入yield语句&#xff0c;这样做的原因就是&#xff0c;通过yield标记出blocking点以及blocking的原因&#xff0c;这样我们就可以在调度的时候实现noblocking&#xff0c;我们调度的时候遇到yield语句并且block之后就可以直接去执行其他的请求而不用阻塞在这里&#xff0c;这里我们也将实现一个简单的noblocking调度方法。

#sever.py

#服务端代码

from socket import *

from fib import fib

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

yield &#39;recv&#39;, sock

client, addr &#61; sock.accept() #blocking

print(&#39;Connection&#39;, addr)

fib_handler(client)

def fib_handler(client):

while True:

yield &#39;recv&#39;, client

req &#61; client.recv(100) #blocking

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

yield &#39;send&#39;, client

client.send(resp) #blocking

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002)) #在25002端口启动程序

上述程序无法运行&#xff0c;因为我们还没有一个yield的调度器&#xff0c;程序只是单纯的阻塞在了yield所标记的地方&#xff0c;这也是协程的一个好处&#xff0c;可以人为来调度&#xff0c;不像thread一样乱序执行。下面是包含了调度器的代码。

from socket import *

from fib import fib

from threading import Thread

from collections import deque

from concurrent.futures import ProcessPoolExecutor as Pool

from select import select

tasks &#61; deque()

recv_wait &#61; {}

send_wait &#61; {}

def run():

while any([tasks, recv_wait, send_wait]):

while not tasks:

can_recv, can_send, _ &#61; select(recv_wait, send_wait, [])

for s in can_recv:

tasks.append(recv_wait.pop(s))

for s in can_send:

tasks.append(send_wait.pop(s))

task &#61; tasks.popleft()

try:

why, what &#61; next(task)

if why &#61;&#61; &#39;recv&#39;:

recv_wait[what] &#61; task

elif why &#61;&#61; &#39;send&#39;:

send_wait[what] &#61; task

else:

raise RuntimeError("ARG!")

except StopIteration:

print("task done")

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

yield &#39;recv&#39;, sock

client, addr &#61; sock.accept()

print(&#39;Connection&#39;, addr)

tasks.append(fib_handler(client))

def fib_handler(client):

while True:

yield &#39;recv&#39;, client

req &#61; client.recv(100)

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

yield &#39;send&#39;, client

client.send(resp)

print(&#39;Closed&#39;)

tasks.append(fib_server((&#39;&#39;, 25003)))

run()

我们通过轮询&#43;select来控制协程&#xff0c;核心是用一个task queue来维护程序运行的流水线&#xff0c;用recv_wait和send_wait两个字典来实现任务的分发。

性能测试

可以看到新的server的qps为&#xff1a;

(82262, &#39;reqs/sec&#39;)

(82915, &#39;reqs/sec&#39;)

(82128, &#39;reqs/sec&#39;)

(82867, &#39;reqs/sec&#39;)

(82284, &#39;reqs/sec&#39;)

(82363, &#39;reqs/sec&#39;)

(82954, &#39;reqs/sec&#39;)

与之前的thread模型性能比较接近&#xff0c;协程的好处是异步的&#xff0c;但是协程 仍然只能使用到一个CPU

当我们让服务器计算40的fib从而占满cpu时&#xff0c;qps迅速下降到了0。

tornado 基于协程的 python web框架

tornado是facebook出品的异步web框架,tornado中协程的使用比较简单&#xff0c;利用coroutine.gen装饰器可以将自己的异步函数注册进tornado的ioloop中&#xff0c;tornado异步方法一般的书写方式为:

&#64;gen.coroutime

def post(self):

resp &#61; yield GetUser()

self.write(resp)

tornado异步原理

def start(self):

"""Starts the I/O loop.

The loop will run until one of the I/O handlers calls stop(), which

will make the loop stop after the current event iteration completes.

"""

self._running &#61; True

while True:

[ ... ]

if not self._running:

break

[ ... ]

try:

event_pairs &#61; self._impl.poll(poll_timeout)

except Exception, e:

if e.args &#61;&#61; (4, "Interrupted system call"):

logging.warning("Interrupted system call", exc_info&#61;1)

continue

else:

raise

# Pop one fd at a time from the set of pending fds and run

# its handler. Since that handler may perform actions on

# other file descriptors, there may be reentrant calls to

# this IOLoop that update self._events

self._events.update(event_pairs)

while self._events:

fd, events &#61; self._events.popitem()

try:

self._handlers[fd](fd, events)

except KeyboardInterrupt:

raise

except OSError, e:

if e[0] &#61;&#61; errno.EPIPE:

# Happens when the client closes the connection

pass

else:

logging.error("Exception in I/O handler for fd %d",

fd, exc_info&#61;True)

except:

logging.error("Exception in I/O handler for fd %d",fd, exc_info&#61;True)

这是tornado异步调度的核心主循环&#xff0c;poll()方法返回一个形如(fd: events)的键值对&#xff0c;并赋值给event_pairs变量&#xff0c;在内部的while循环中&#xff0c;event_pairs中的内容被一个一个的取出&#xff0c;然后相应的处理器会被调用&#xff0c;tornado通过下面的函数讲socket注册进epoll中。tornado在linux默认选择epoll&#xff0c;在windows下默认选择select(只能选择select)。

def add_handler(self, fd, handler, events):

"""Registers the given handler to receive the given events for fd."""

self._handlers[fd] &#61; handler

self._impl.register(fd, events | self.ERROR)

cherrypy线程池与tornado协程的比较

我们通过最简单程序运行在单机上进行性能比较

测试的语句为&#xff1a;

ab -c 100 -n 1000 -k localhost:8080/ | grep "Time taken for tests:"

其中cherrypy的表现为&#xff1a;

Completed 100 requests

Completed 200 requests

Completed 300 requests

Completed 400 requests

Completed 500 requests

Completed 600 requests

Completed 700 requests

Completed 800 requests

Completed 900 requests

Completed 1000 requests

Finished 1000 requests

Time taken for tests: 10.773 seconds

tornado的表现为&#xff1a;

Completed 100 requests

Completed 200 requests

Completed 300 requests

Completed 400 requests

Completed 500 requests

Completed 600 requests

Completed 700 requests

Completed 800 requests

Completed 900 requests

Completed 1000 requests

Finished 1000 requests

Time taken for tests: 0.377 seconds

可以看出tornado的性能还是非常惊人的&#xff0c;当应用程序涉及到异步IO还是要尽量使用tornado

总结

本文主要介绍了python的线程、进程和协程以及其应用&#xff0c;并对这几种模型进行了简单的性能分析&#xff0c;python由于GIL的存在&#xff0c;不管是线程还是协程都不能利用到多核。

对于计算密集型的web app线程模型与协程模型的性能大致一样&#xff0c;线程由于调度受操作系统管理&#xff0c;其性能略好。

对于IO密集型的web app协程模型性能会有很大的优势。

参考文献



推荐阅读
  • 本文探讨了 Spring Boot 应用程序在不同配置下支持的最大并发连接数,重点分析了内置服务器(如 Tomcat、Jetty 和 Undertow)的默认设置及其对性能的影响。 ... [详细]
  • 本文详细介绍如何使用arm-eabi-gdb调试Android平台上的C/C++程序。通过具体步骤和实用技巧,帮助开发者更高效地进行调试工作。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • 将Web服务部署到Tomcat
    本文介绍了如何在JDeveloper 12c中创建一个Java项目,并将其打包为Web服务,然后部署到Tomcat服务器。内容涵盖从项目创建、编写Web服务代码、配置相关XML文件到最终的本地部署和验证。 ... [详细]
  • 使用Vultr云服务器和Namesilo域名搭建个人网站
    本文详细介绍了如何通过Vultr云服务器和Namesilo域名搭建一个功能齐全的个人网站,包括购买、配置服务器以及绑定域名的具体步骤。文章还提供了详细的命令行操作指南,帮助读者顺利完成建站过程。 ... [详细]
  • MySQL索引详解与优化
    本文深入探讨了MySQL中的索引机制,包括索引的基本概念、优势与劣势、分类及其实现原理,并详细介绍了索引的使用场景和优化技巧。通过具体示例,帮助读者更好地理解和应用索引以提升数据库性能。 ... [详细]
  • 深入理解 SQL 视图、存储过程与事务
    本文详细介绍了SQL中的视图、存储过程和事务的概念及应用。视图为用户提供了一种灵活的数据查询方式,存储过程则封装了复杂的SQL逻辑,而事务确保了数据库操作的完整性和一致性。 ... [详细]
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 数据库内核开发入门 | 搭建研发环境的初步指南
    本课程将带你从零开始,逐步掌握数据库内核开发的基础知识和实践技能,重点介绍如何搭建OceanBase的开发环境。 ... [详细]
  • 解决PHP与MySQL连接时出现500错误的方法
    本文详细探讨了当使用PHP连接MySQL数据库时遇到500内部服务器错误的多种解决方案,提供了详尽的操作步骤和专业建议。无论是初学者还是有经验的开发者,都能从中受益。 ... [详细]
  • 本文详细探讨了Netty中Future及其子类的设计与实现,包括其在并发编程中的作用和具体应用场景。我们将介绍Future的继承体系、关键方法的实现细节,并讨论如何通过监听器和回调机制来处理异步任务的结果。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 本文详细介绍了如何在Ubuntu系统中下载适用于Intel处理器的64位版本,涵盖了不同Linux发行版对64位架构的不同命名方式,并提供了具体的下载链接和步骤。 ... [详细]
  • 基于KVM的SRIOV直通配置及性能测试
    SRIOV介绍、VF直通配置,以及包转发率性能测试小慢哥的原创文章,欢迎转载目录?1.SRIOV介绍?2.环境说明?3.开启SRIOV?4.生成VF?5.VF ... [详细]
author-avatar
奇国的雪儿
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有