如何在Python中设计异步管道模式

8

我正在尝试设计一个异步流水线,可以轻松地制作数据处理流水线。该流水线由多个函数组成。输入数据从流水线的一端进入,从另一端出来。

我希望以以下方式设计流水线:

  1. 可以在流水线中插入其他函数
  2. 已经在流水线中的函数可以弹出

这是我想出来的方案:

import asyncio

@asyncio.coroutine
def add(x):
    return x + 1

@asyncio.coroutine
def prod(x):
    return x * 2

@asyncio.coroutine
def power(x):
    return x ** 3

def connect(funcs):
    def wrapper(*args, **kwargs):
        data_out = yield from funcs[0](*args, **kwargs)
        for func in funcs[1:]:
            data_out = yield from func(data_out)
        return data_out
    return wrapper

pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)

这当然可行,但问题是,如果我想将另一个函数添加进这个流水线(或从中弹出一个函数),我必须再次拆卸和重新连接每个函数。
我想知道是否有更好的方案或设计模式来创建这样的流水线?

1
我认为你明白我的意思,我不想重新创建整个东西,只是因为当你只需要改变一个小部分时,重新创建一切并不优雅。如果管道包含数十个函数,并且我需要经常更改某些函数,重建所有内容将变得乏味和低效。 - shelper
1
似乎你可以创建一个Pipeline类并维护一个实例变量来存储函数列表,然后实现方法来获取/删除此列表中的函数。然后只需实现 __call__ ,以便可以将Pipeline实例发送到asyncio事件循环。 - Eric Conner
@EricConner,我不完全理解你的建议。按照你的建议,我认为问题变成了如何实现获取/删除函数?我是否需要为仅更改一个函数而重新连接每个函数? - shelper
@maxymoo 这只是不够优雅... 如果没有更好的解决方案,我会像你和Eric说的那样,创建一个类并使用列表来存储函数。 - shelper
你的程序在步骤中是否使用异步I/O(很可能是网络)?如果没有,就不应该使用asyncio。 - Udi
显示剩余3条评论
2个回答

7
我之前做过类似的事情,只使用multiprocessing库。虽然稍微有点手动,但它可以让你轻松创建和修改你的管道,正如你在问题中所要求的那样。
想法是创建可以存在于多进程池中的函数,它们唯一的参数是输入队列和输出队列。通过传递不同的队列将各个阶段连接在一起。每个阶段在其输入队列上接收一些工作,进行一些更多的工作,并通过其输出队列将结果传递给下一个阶段。
工作进程旋转以尝试从其队列中获取内容,当它们得到内容时,它们会处理它并将结果传递给下一个阶段。所有工作都通过将“毒丸”通过管道进行传递而结束,导致所有阶段退出:
此示例仅在多个工作阶段中构建字符串:
import multiprocessing as mp                                              

POISON_PILL = "STOP"                                                      

def stage1(q_in, q_out):                                                  

    while True:

        # get either work or a poison pill from the previous stage (or main)
        val = q_in.get()                                                  

        # check to see if we got the poison pill - pass it along if we did
        if val == POISON_PILL:                                            
            q_out.put(val)                                                
            return                                                        

        # do stage 1 work                                                                  
        val = val + "Stage 1 did some work.\n"

        # pass the result to the next stage
        q_out.put(val)                                                    

def stage2(q_in, q_out):                                                  

    while True:                                                           

        val = q_in.get()                                                  
        if val == POISON_PILL:                                            
            q_out.put(val)                                                
            return                                                        

        val = val + "Stage 2 did some work.\n"                            
        q_out.put(val)                                                    

def main():                                                               

    pool = mp.Pool()                                                      
    manager = mp.Manager()                                                

    # create managed queues                                               
    q_main_to_s1 = manager.Queue()                                        
    q_s1_to_s2 = manager.Queue()                                          
    q_s2_to_main = manager.Queue()                                        

    # launch workers, passing them the queues they need                   
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))     
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))     

    # Send a message into the pipeline                                    
    q_main_to_s1.put("Main started the job.\n")                           

    # Wait for work to complete                                           
    print(q_s2_to_main.get()+"Main finished the job.")                    

    q_main_to_s1.put(POISON_PILL)                                         

    pool.close()                                                          
    pool.join()                                                           

    return                                                                

if __name__ == "__main__":                                                
    main()

代码产生的输出如下:

主程序开始工作。
第一阶段完成了一些工作。
第二阶段完成了一些工作。
主程序完成了工作。

您可以轻松地通过更改哪些函数获取哪些队列来在管道中添加更多阶段或重新排列它们。我不太熟悉 asyncio 模块,因此无法说明使用 multiprocessing 库会失去哪些功能,但这种方法非常简单易懂,因此我喜欢它的简洁性。

2

我不确定这是否是最佳方法,但这是我的解决方案。

虽然我认为可以使用列表或字典来控制流水线,但我发现使用生成器更容易和更有效。

考虑以下生成器:

def controller():
    old = value = None
    while True:
        new = (yield value)
        value = old
        old = new

这基本上是一个单元素队列,它存储您发送的值,并在下一次调用send(或next)时释放。

示例:

>>> c = controller()
>>> next(c)           # prime the generator
>>> c.send(8)         # send a value
>>> next(c)           # pull the value from the generator
8

通过将管道中的每个协程与其控制器关联,我们将拥有一个外部句柄,可以用于推送每个协程的目标。我们只需要以一种方式定义我们的协程,使它们能在每个周期从控制器中拉取新的目标。

现在考虑以下协程:

def source(controller):
    while True:
        target = next(controller)
        print("source sending to", target.__name__) 
        yield (yield from target)

def add():
    return (yield) + 1

def prod():
    return (yield) * 2

源是一个协程,它不会return,因此在第一次循环后不会终止自身。其他协程是“sink”并且不需要控制器。 您可以将这些协程用于管道中,如以下示例所示。我们最初设置路由source --> add,在接收到第一个结果后,我们将路由更改为source --> prod

# create a controller for the source and prime it 
cont_source = controller()
next(cont_source)

# create three coroutines
# associate the source with its controller
coro_source = source(cont_source)
coro_add = add()
coro_prod = prod()

# create a pipeline
cont_source.send(coro_add)

# prime the source and send a value to it
coro_source.send(None)
print("add =", coro_source.send(4))

# change target of the source
cont_source.send(coro_prod)

# reset the source, send another value
coro_source.send(None)
print("prod =", coro_source.send(8))

输出:

source sending to add
add = 5
source sending to prod
prod = 16

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接