热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Python使用非阻塞IO

Python的默认IO没有非阻塞(Non-blocking)的功能,默认情况下,以任何方式调用read,都可能会被阻塞。subprocess中的stdoutstderr流场景描述假

Python 的默认 IO 没有非阻塞 (Non-blocking) 的功能,默认情况下,以任何方式调用 read,都可能会被阻塞。


subprocess 中的 stdout/stderr 流


场景描述

假设我们现在需要通过 subprocess 调用一个子程序比如 aria2c, 然后需要实时分析它的 stdout 内容。

那么问题就来了:

import time
import shlex
import subprocess
from subprocess import PIPE
cmd = shlex.split("ria2c -x16 'http://host/file.zip'", stdout=PIPE)
aria2c = subprocess.Popen(cmd, capture_output=True, text=True, encoding='utf-8')
while aria2c.poll() is None: # is running
line = aria2c.stdout.readline() # blocking
# wait
time.sleep(1)

解决办法



  1. 使用新线程去调用 read() 并保存到一个 buffer 中,主线程直接读 buffer。

    • 开销太大



  2. 使用标准库 select 检查是否可读,比较优雅。

  3. 使用 fcntl 为 stdout 设置 O_NONBLOCK 标志


socket 连接中的 io 流

非阻塞读取的实现:

import time
import socket
from asyncio import IncompleteReadError
class SocketStreamReader:
def __init__(self, sock: socket.socket):
sock.setblocking(False) # non-blocking
self.sock = sock
self._recv_buffer = bytearray()
def read(self, num_bytes: int = -1) -> bytes:
raise NotImplementedError
def readexactly(self, num_bytes: int) -> bytes:
buf = bytearray(num_bytes)
pos = 0
while pos n = self._recv_into(memoryview(buf)[pos:])
if n == 0:
raise IncompleteReadError(bytes(buf[:pos]), num_bytes)
pos += n
return bytes(buf)
def readline(self) -> bytes:
return self.readuntil(b"\n")
def readuntil(self, separator: bytes = b"\n") -> bytes:
if len(separator) != 1:
raise ValueError("Only separators of length 1 are supported.")
chunk = bytearray(4096)
start = 0
buf = bytearray(len(self._recv_buffer))
bytes_read = self._recv_into(memoryview(buf))
assert bytes_read == len(buf)
while True:
idx = buf.find(separator, start)
if idx != -1:
break
start = len(self._recv_buffer)
bytes_read = self._recv_into(memoryview(chunk))
if bytes_read == 0:
return None
buf += memoryview(chunk)[:bytes_read]
result = bytes(buf[: idx + 1])
self._recv_buffer = b"".join(
(memoryview(buf)[idx + 1 :], self._recv_buffer)
)
return result
def _recv_into(self, view: memoryview) -> int:
bytes_read = min(len(view), len(self._recv_buffer))
view[:bytes_read] = self._recv_buffer[:bytes_read]
self._recv_buffer = self._recv_buffer[bytes_read:]
if bytes_read == len(view):
return bytes_read
try:
bytes_read += self.sock.recv_into(view[bytes_read:])
except BlockingIOError: # socket not avaliable now
return 0
return bytes_read
socket_pair = "192.168.31.22", 8080
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(socket_pair)
socket_reader = SocketStreamReader(sock)
while True: # 问题:这里会
line = reader.readline()
if line is None:
time.sleep(0.0001)
else:
line = line.decode()
print(line)

参考



  • Non blocking reading from a subprocess output stream in Python

  • Pure-Python non-blocking IO and background IO functions

  • select — Waiting for I/O completion



推荐阅读
author-avatar
手机用户2602937685
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有