将协调生成器管道作为协同程序的最佳方法是什么?

 中国地产人 发布于 2022-12-08 21:13

考虑以下代码:

#!/usr/bin/env python
# coding=utf-8
from string import letters


def filter_upper(letters):
    for letter in letters:
        if letter.isupper():
            yield letter

def filter_selected(letters, selected):
    selected = set(map(str.lower, selected))
    for letter in letters:
        if letter.lower() in selected:
            yield letter


def main():
    stuff = filter_selected(filter_upper(letters), ['a', 'b', 'c'])
    print(list(stuff))

main()

这是从发电机构建的管道的图示.我经常在实践中使用这种模式来构建数据处理流程.这就像UNIX管道.

将生成器重构为每个暂停执行的协同程序的最优雅方法是什么yield

UPDATE

我的第一次尝试是这样的:

#!/usr/bin/env python
# coding=utf-8

import asyncio

@asyncio.coroutine
def coro():
    for e in ['a', 'b', 'c']:
        future = asyncio.Future()
        future.set_result(e)
        yield from future


@asyncio.coroutine
def coro2():
    a = yield from coro()
    print(a)


loop = asyncio.get_event_loop()
loop.run_until_complete(coro2())

但由于某种原因它不起作用 - 变量a变为None.

更新#1

我最近想出了什么:

服务器:

#!/usr/bin/env python
# coding=utf-8
"""Server that accepts a client and send it strings from user input."""


import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 5555

s.bind((host, port))
s.listen(1)

print('Listening...')

conn, addr = s.accept()

print('Client ({}) connected.'.format(addr))

while True:
    conn.send(raw_input('Enter data to send: '))

客户:

#!/usr/bin/env python
# coding=utf-8
"""Client that demonstrates processing pipeline."""

import trollius as asyncio
from trollius import From


@asyncio.coroutine
def upper(input, output):
    while True:
        char = yield From(input.get())
        print('Got char: ', char)
        yield From(output.put(char.upper()))


@asyncio.coroutine
def glue(input, output):
    chunk = []
    while True:
        e = yield From(input.get())
        chunk.append(e)
        print('Current chunk: ', chunk)
        if len(chunk) == 3:
            yield From(output.put(chunk))
            chunk = []


@asyncio.coroutine
def tcp_echo_client(loop):
    reader, writer = yield From(asyncio.open_connection('127.0.0.1', 5555,
                                                        loop=loop))
    q1 = asyncio.Queue()
    q2 = asyncio.Queue()
    q3 = asyncio.Queue()

    @asyncio.coroutine
    def printer():
        while True:
            print('Pipeline ouput: ', (yield From(q3.get())))

    asyncio.async(upper(q1, q2))
    asyncio.async(glue(q2, q3))
    asyncio.async(printer())

    while True:
        data = yield From(reader.read(100))
        print('Data: ', data)

        for byte in data:
            yield From(q1.put(byte))

    print('Close the socket')
    writer.close()


@asyncio.coroutine
def background_stuff():
    while True:
        yield From(asyncio.sleep(3))
        print('Other background stuff...')


loop = asyncio.get_event_loop()
asyncio.async(background_stuff())
loop.run_until_complete(tcp_echo_client(loop))
loop.close()

优于"David Beazley的协同程序"的是,你可以使用asyncio这些处理单元中的所有东西inputoutput队列.
这里的缺点 - 连接管道单元需要很多队列实例.它可以使用更先进的数据结构来修复asyncio.Queue.
另一个缺点是这种处理单元不会将它们的异常传播到父堆栈帧,因为它们是"后台任务",而"David Beazley的协同程序"确实传播.

更新#2

这就是我的想法:https:
//gist.github.com/AndrewPashkin/04c287def6d165fc2832

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有