我正在尝试设计一个异步流水线,可以轻松地制作数据处理流水线。该流水线由多个函数组成。输入数据从流水线的一端进入,从另一端出来。
我希望以以下方式设计流水线:
- 可以在流水线中插入其他函数
- 已经在流水线中的函数可以弹出
这是我想出来的方案:
import asyncio
@asyncio.coroutine
def add(x):
return x + 1
@asyncio.coroutine
def prod(x):
return x * 2
@asyncio.coroutine
def power(x):
return x ** 3
def connect(funcs):
def wrapper(*args, **kwargs):
data_out = yield from funcs[0](*args, **kwargs)
for func in funcs[1:]:
data_out = yield from func(data_out)
return data_out
return wrapper
pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)
这当然可行,但问题是,如果我想将另一个函数添加进这个流水线(或从中弹出一个函数),我必须再次拆卸和重新连接每个函数。
我想知道是否有更好的方案或设计模式来创建这样的流水线?
__call__
,以便可以将Pipeline实例发送到asyncio事件循环。 - Eric Conner