考虑以下代码:
#!/usr/bin/env python # coding=utf-8 from string import letters def filter_upper(letters): for letter in letters: if letter.isupper(): yield letter def filter_selected(letters, selected): selected = set(map(str.lower, selected)) for letter in letters: if letter.lower() in selected: yield letter def main(): stuff = filter_selected(filter_upper(letters), ['a', 'b', 'c']) print(list(stuff)) main()
这是从发电机构建的管道的图示.我经常在实践中使用这种模式来构建数据处理流程.这就像UNIX管道.
将生成器重构为每个暂停执行的协同程序的最优雅方法是什么yield
?
我的第一次尝试是这样的:
#!/usr/bin/env python # coding=utf-8 import asyncio @asyncio.coroutine def coro(): for e in ['a', 'b', 'c']: future = asyncio.Future() future.set_result(e) yield from future @asyncio.coroutine def coro2(): a = yield from coro() print(a) loop = asyncio.get_event_loop() loop.run_until_complete(coro2())
但由于某种原因它不起作用 - 变量a
变为None
.
我最近想出了什么:
服务器:
#!/usr/bin/env python # coding=utf-8 """Server that accepts a client and send it strings from user input.""" import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) host = '' port = 5555 s.bind((host, port)) s.listen(1) print('Listening...') conn, addr = s.accept() print('Client ({}) connected.'.format(addr)) while True: conn.send(raw_input('Enter data to send: '))
客户:
#!/usr/bin/env python # coding=utf-8 """Client that demonstrates processing pipeline.""" import trollius as asyncio from trollius import From @asyncio.coroutine def upper(input, output): while True: char = yield From(input.get()) print('Got char: ', char) yield From(output.put(char.upper())) @asyncio.coroutine def glue(input, output): chunk = [] while True: e = yield From(input.get()) chunk.append(e) print('Current chunk: ', chunk) if len(chunk) == 3: yield From(output.put(chunk)) chunk = [] @asyncio.coroutine def tcp_echo_client(loop): reader, writer = yield From(asyncio.open_connection('127.0.0.1', 5555, loop=loop)) q1 = asyncio.Queue() q2 = asyncio.Queue() q3 = asyncio.Queue() @asyncio.coroutine def printer(): while True: print('Pipeline ouput: ', (yield From(q3.get()))) asyncio.async(upper(q1, q2)) asyncio.async(glue(q2, q3)) asyncio.async(printer()) while True: data = yield From(reader.read(100)) print('Data: ', data) for byte in data: yield From(q1.put(byte)) print('Close the socket') writer.close() @asyncio.coroutine def background_stuff(): while True: yield From(asyncio.sleep(3)) print('Other background stuff...') loop = asyncio.get_event_loop() asyncio.async(background_stuff()) loop.run_until_complete(tcp_echo_client(loop)) loop.close()
优于"David Beazley的协同程序"的是,你可以使用asyncio
这些处理单元中的所有东西input
和output
队列.
这里的缺点 - 连接管道单元需要很多队列实例.它可以使用更先进的数据结构来修复asyncio.Queue
.
另一个缺点是这种处理单元不会将它们的异常传播到父堆栈帧,因为它们是"后台任务",而"David Beazley的协同程序"确实传播.
这就是我的想法:https:
//gist.github.com/AndrewPashkin/04c287def6d165fc2832