python并发与web
python并发主要方式有:
Thread(线程)
Process(进程)
协程
python因为GIL的存在使得python的并发无法利用CPU多核的优势以至于性能比较差,下面我们将通过几个例子来介绍python的并发。
线程
我们通过一个简单web server程序来观察python的线程,首先写一个耗时的小函数
def fib(n):
if n <&#61; 2:
return 1
else:
return fib(n - 1) &#43; fib(n - 2)
然后写一个fib web server&#xff0c;程序比较简单就不解释了。
from socket import *
from fib import fib
def fib_server(address):
sock &#61; socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr &#61; sock.accept()
print(&#39;Connection&#39;, addr)
fib_handle(client)
def fib_handler(client):
while True:
req &#61; client.recv(100)
if not req:
break
n &#61; int(req)
result &#61; fib(n)
resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;
client.send(resp)
print(&#39;Closed&#39;)
fib_server((&#39;&#39;, 25002))
运行shell命令可以看到计算结果
nc localhost 25002
10
55
由于服务段是单线程的&#xff0c;如果另外启动一个连接将得不到计算结果
nc localhost 25002
10
为了能让我们的server支持多个请求&#xff0c;我们对服务端代码加入多线程支持
#sever.py
#服务端代码
from socket import *
from fib import fib
from threading import Thread
def fib_server(address):
sock &#61; socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr &#61; sock.accept()
print(&#39;Connection&#39;, addr)
#fib_handler(client)
Thread(target&#61;fib_handler, args&#61;(client,), daemon&#61;True).start() #需要在python3下运行
def fib_handler(client):
while True:
req &#61; client.recv(100)
if not req:
break
n &#61; int(req)
result &#61; fib(n)
resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;
client.send(resp)
print(&#39;Closed&#39;)
fib_server((&#39;&#39;, 25002)) #在25002端口启动程序
运行shell命令可以看到计算结果
nc localhost 25002
10
55
由于服务端是多线程的&#xff0c;启动一个新连接将得到计算结果
nc localhost 25002
10
55
性能测试
我们加入一段性能测试代码
#perf1.py
from socket import *
from threading import Thread
import time
sock &#61; socket(AF_INET, SOCK_STREAM)
sock.connect((&#39;localhost&#39;, 25002))
n &#61; 0
def monitor():
global n
while True:
time.sleep(1)
print(n, &#39;reqs/sec&#39;)
n &#61; 0
Thread(target&#61;monitor).start()
while True:
start &#61; time.time()
sock.send(b&#39;1&#39;)
resp &#61; sock.recv(100)
end &#61; time.time()
n &#43;&#61; 1
#代码非常简单&#xff0c;通过全局变量n来统计qps(req/sec 每秒请求数)
在shell中运行perf1.py可以看到结果如下:
106025 reqs/sec
109382 reqs/sec
98211 reqs/sec
105391 reqs/sec
108875 reqs/sec
平均每秒请求数大概是10w左右
如果我们另外启动一个进程来进行性能测试就会发现python的GIL对线程造成的影响
python3 perf1.py
74677 reqs/sec
78284 reqs/sec
72029 reqs/sec
81719 reqs/sec
82392 reqs/sec
84261 reqs/sec
并且原来的shell中的qps也是类似结果
96488 reqs/sec
99380 reqs/sec
84918 reqs/sec
87485 reqs/sec
85118 reqs/sec
78211 reqs/sec
如果我们再运行
nc localhost 25002
40
来完全占用服务器资源一段时间&#xff0c;就可以看到shell窗口内的rqs迅速下降到
99 reqs/sec
99 reqs/sec
这也反映了Python的GIL的一个特点&#xff0c;会优先处理占用CPU资源大的任务
具体原因我也不知道&#xff0c;可能需要阅读GIL实现源码才能知道。
线程池在web编程的应用
python有个库叫做cherrypy&#xff0c;最近用到&#xff0c;大致浏览了一下其源代码&#xff0c;其内核使用的是python线程池技术。
cherrypy通过Python线程安全的队列来维护线程池&#xff0c;具体实现为:
class ThreadPool(object):
"""A Request Queue for an HTTPServer which pools threads.
ThreadPool objects must provide min, get(), put(obj), start()
and stop(timeout) attributes.
"""
def __init__(self, server, min&#61;10, max&#61;-1,
accepted_queue_size&#61;-1, accepted_queue_timeout&#61;10):
self.server &#61; server
self.min &#61; min
self.max &#61; max
self._threads &#61; []
self._queue &#61; queue.Queue(maxsize&#61;accepted_queue_size)
self._queue_put_timeout &#61; accepted_queue_timeout
self.get &#61; self._queue.get
def start(self):
"""Start the pool of threads."""
for i in range(self.min):
self._threads.append(WorkerThread(self.server))
for worker in self._threads:
worker.setName(&#39;CP Server &#39; &#43; worker.getName())
worker.start()
for worker in self._threads:
while not worker.ready:
time.sleep(.1)
....
def put(self, obj):
self._queue.put(obj, block&#61;True, timeout&#61;self._queue_put_timeout)
if obj is _SHUTDOWNREQUEST:
return
def grow(self, amount):
"""Spawn new worker threads (not above self.max)."""
if self.max > 0:
budget &#61; max(self.max - len(self._threads), 0)
else:
# self.max <&#61; 0 indicates no maximum
budget &#61; float(&#39;inf&#39;)
n_new &#61; min(amount, budget)
workers &#61; [self._spawn_worker() for i in range(n_new)]
while not all(worker.ready for worker in workers):
time.sleep(.1)
self._threads.extend(workers)
....
def shrink(self, amount):
"""Kill off worker threads (not below self.min)."""
[...]
def stop(self, timeout&#61;5):
# Must shut down threads here so the code that calls
# this method can know when all threads are stopped.
[...]
可以看出来&#xff0c;cherrypy的线程池将大小初始化为10&#xff0c;每当有一个httpconnect进来时就将其放入任务队列中&#xff0c;然后WorkerThread会不断从任务队列中取出任务执行&#xff0c;可以看到这是一个非常标准的线程池模型。
进程
由于Python的thread无法利用多核&#xff0c;为了充分利用多核CPU&#xff0c;Python可以使用了多进程来模拟线程以提高并发的性能。Python的进程代价比较高可以看做是另外再启动一个python进程。
#server_pool.py
from socket import *
from fib import fib
from threading import Thread
from concurrent.futures import ProcessPoolExecutor as Pool #这里用的python3的线程池&#xff0c;对应python2的threadpool
pool &#61; Pool(4) #启动一个大小为4的进程池
def fib_server(address):
sock &#61; socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr &#61; sock.accept()
print(&#39;Connection&#39;, addr)
Thread(target&#61;fib_handler, args&#61;(client,), daemon&#61;True).start()
def fib_handler(client):
while True:
req &#61; client.recv(100)
if not req:
break
n &#61; int(req)
future &#61; pool.submit(fib, n)
result &#61; future.result()
resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;
client.send(resp)
print(&#39;Closed&#39;)
fib_server((&#39;&#39;, 25002))
性能测试
可以看到新的server的qps为&#xff1a;
4613 reqs/sec
4764 reqs/sec
4619 reqs/sec
4393 reqs/sec
4768 reqs/sec
4846 reqs/sec
这个结果远低于前面的10w qps主要原因是进程启动速度较慢&#xff0c;进程池内部逻辑比较复杂&#xff0c;涉及到了数据传输&#xff0c;队列等问题。
但是通过多进程我们可以保证每一个链接相对独立&#xff0c;不会受其他请求太大的影响。
即使我们使用以下耗时的命令也不会影响到性能测试
nc localhost 25502
40
协程
协程简介
协程是一个古老的概念&#xff0c;最早出现在早期的os中&#xff0c;它出现的时间甚至比线程进程还要早。
协程也是一个比较难以理解和运用的并发方式&#xff0c;用协程写出来的代码比较难以理解。
python中使用yield和next来实现协程的控制。
def count(n):
while(n > 0):
yield n #yield起到的作用是blocking&#xff0c;将代码阻塞在这里&#xff0c;生成一个generator&#xff0c;然后通过next调用。
n -&#61; 1
for i in count(5):
print(i)
#可以看到运行结果&#xff1a;
5
4
3
2
1
下面我们通过例子来介绍如何书写协程代码。首先回到之前的代码。首先我们要想到我们为什么要用线程&#xff0c;当然是为了防止阻塞&#xff0c;
这里的阻塞来自socket的IO和cpu占用2个方面。协程的引入也是为了防止阻塞&#xff0c;因此我们先将代码中的阻塞点标记出来。
#sever.py
#服务端代码
from socket import *
from fib import fib
def fib_server(address):
sock &#61; socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr &#61; sock.accept() #blocking
print(&#39;Connection&#39;, addr)
fib_handler(client)
def fib_handler(client):
while True:
req &#61; client.recv(100) #blocking
if not req:
break
n &#61; int(req)
result &#61; fib(n)
resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;
client.send(resp) #blocking
print(&#39;Closed&#39;)
fib_server((&#39;&#39;, 25002)) #在25002端口启动程序
上面标记了3个socket IO阻塞点&#xff0c;我们先忽略CPU占用。
首先我们在blocking点插入yield语句&#xff0c;这样做的原因就是&#xff0c;通过yield标记出blocking点以及blocking的原因&#xff0c;这样我们就可以在调度的时候实现noblocking&#xff0c;我们调度的时候遇到yield语句并且block之后就可以直接去执行其他的请求而不用阻塞在这里&#xff0c;这里我们也将实现一个简单的noblocking调度方法。
#sever.py
#服务端代码
from socket import *
from fib import fib
def fib_server(address):
sock &#61; socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield &#39;recv&#39;, sock
client, addr &#61; sock.accept() #blocking
print(&#39;Connection&#39;, addr)
fib_handler(client)
def fib_handler(client):
while True:
yield &#39;recv&#39;, client
req &#61; client.recv(100) #blocking
if not req:
break
n &#61; int(req)
result &#61; fib(n)
resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;
yield &#39;send&#39;, client
client.send(resp) #blocking
print(&#39;Closed&#39;)
fib_server((&#39;&#39;, 25002)) #在25002端口启动程序
上述程序无法运行&#xff0c;因为我们还没有一个yield的调度器&#xff0c;程序只是单纯的阻塞在了yield所标记的地方&#xff0c;这也是协程的一个好处&#xff0c;可以人为来调度&#xff0c;不像thread一样乱序执行。下面是包含了调度器的代码。
from socket import *
from fib import fib
from threading import Thread
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool
from select import select
tasks &#61; deque()
recv_wait &#61; {}
send_wait &#61; {}
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
can_recv, can_send, _ &#61; select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task &#61; tasks.popleft()
try:
why, what &#61; next(task)
if why &#61;&#61; &#39;recv&#39;:
recv_wait[what] &#61; task
elif why &#61;&#61; &#39;send&#39;:
send_wait[what] &#61; task
else:
raise RuntimeError("ARG!")
except StopIteration:
print("task done")
def fib_server(address):
sock &#61; socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield &#39;recv&#39;, sock
client, addr &#61; sock.accept()
print(&#39;Connection&#39;, addr)
tasks.append(fib_handler(client))
def fib_handler(client):
while True:
yield &#39;recv&#39;, client
req &#61; client.recv(100)
if not req:
break
n &#61; int(req)
result &#61; fib(n)
resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;
yield &#39;send&#39;, client
client.send(resp)
print(&#39;Closed&#39;)
tasks.append(fib_server((&#39;&#39;, 25003)))
run()
我们通过轮询&#43;select来控制协程&#xff0c;核心是用一个task queue来维护程序运行的流水线&#xff0c;用recv_wait和send_wait两个字典来实现任务的分发。
性能测试
可以看到新的server的qps为&#xff1a;
(82262, &#39;reqs/sec&#39;)
(82915, &#39;reqs/sec&#39;)
(82128, &#39;reqs/sec&#39;)
(82867, &#39;reqs/sec&#39;)
(82284, &#39;reqs/sec&#39;)
(82363, &#39;reqs/sec&#39;)
(82954, &#39;reqs/sec&#39;)
与之前的thread模型性能比较接近&#xff0c;协程的好处是异步的&#xff0c;但是协程 仍然只能使用到一个CPU
当我们让服务器计算40的fib从而占满cpu时&#xff0c;qps迅速下降到了0。
tornado 基于协程的 python web框架
tornado是facebook出品的异步web框架,tornado中协程的使用比较简单&#xff0c;利用coroutine.gen装饰器可以将自己的异步函数注册进tornado的ioloop中&#xff0c;tornado异步方法一般的书写方式为:
&#64;gen.coroutime
def post(self):
resp &#61; yield GetUser()
self.write(resp)
tornado异步原理
def start(self):
"""Starts the I/O loop.
The loop will run until one of the I/O handlers calls stop(), which
will make the loop stop after the current event iteration completes.
"""
self._running &#61; True
while True:
[ ... ]
if not self._running:
break
[ ... ]
try:
event_pairs &#61; self._impl.poll(poll_timeout)
except Exception, e:
if e.args &#61;&#61; (4, "Interrupted system call"):
logging.warning("Interrupted system call", exc_info&#61;1)
continue
else:
raise
# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
fd, events &#61; self._events.popitem()
try:
self._handlers[fd](fd, events)
except KeyboardInterrupt:
raise
except OSError, e:
if e[0] &#61;&#61; errno.EPIPE:
# Happens when the client closes the connection
pass
else:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info&#61;True)
except:
logging.error("Exception in I/O handler for fd %d",fd, exc_info&#61;True)
这是tornado异步调度的核心主循环&#xff0c;poll()方法返回一个形如(fd: events)的键值对&#xff0c;并赋值给event_pairs变量&#xff0c;在内部的while循环中&#xff0c;event_pairs中的内容被一个一个的取出&#xff0c;然后相应的处理器会被调用&#xff0c;tornado通过下面的函数讲socket注册进epoll中。tornado在linux默认选择epoll&#xff0c;在windows下默认选择select(只能选择select)。
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] &#61; handler
self._impl.register(fd, events | self.ERROR)
cherrypy线程池与tornado协程的比较
我们通过最简单程序运行在单机上进行性能比较
测试的语句为&#xff1a;
ab -c 100 -n 1000 -k localhost:8080/ | grep "Time taken for tests:"
其中cherrypy的表现为&#xff1a;
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests
Time taken for tests: 10.773 seconds
tornado的表现为&#xff1a;
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests
Time taken for tests: 0.377 seconds
可以看出tornado的性能还是非常惊人的&#xff0c;当应用程序涉及到异步IO还是要尽量使用tornado
总结
本文主要介绍了python的线程、进程和协程以及其应用&#xff0c;并对这几种模型进行了简单的性能分析&#xff0c;python由于GIL的存在&#xff0c;不管是线程还是协程都不能利用到多核。
对于计算密集型的web app线程模型与协程模型的性能大致一样&#xff0c;线程由于调度受操作系统管理&#xff0c;其性能略好。
对于IO密集型的web app协程模型性能会有很大的优势。
参考文献
略