如何使用Python asyncio编写消费者/生产者代码?

4

我的Python版本是3.6.1。

我编写了一些使用Python asyncio实现消费者生产者模型的内容。但它并没有按照预期工作。

虽然创建了四个事件,但没有任何输出被打印出来。

async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def main():
    queue = asyncio.Queue()

    consumer_1 = asyncio.ensure_future(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.ensure_future(consumer(queue, 'consumer_2'))

    producer_1 = asyncio.ensure_future(producer(queue, 'producer_1'))
    producer_2 = asyncio.ensure_future(producer(queue, 'producer_2'))

    await asyncio.sleep(10)
    consumer_1.cancel()
    consumer_2.cancel()

    await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)

loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(main())]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

请问您能帮我纠正一下吗?

这段内容与IT技术无关。

我已经成功地使用了multiprocessing.Queue而不是asyncio.Queue https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing.Queue。 - abdusco
@abdusco 抱歉,我忘记了我的Python版本是3.6.1。我将asyncio.Queue更改为multiprocessing.Queue。程序在consumer_1中挂起。 - Jasper_Li
2个回答

3

你的方法存在许多问题,其中一些问题如下:

  • 生产者 中不必要的 asyncio.ensure_future
  • 使用怀疑的顺序将 asyncio.gather 与其他项放在一起
  • 多余的 tasks = [asyncio.ensure_future(main())]asyncio.wait(tasks) 操作
  • 控制不当的 queue

示例:asyncio 生产者/消费者模式:https://asyncio.readthedocs.io/en/latest/producer_consumer.html


以下是适用于您情况的正确的生产者/消费者模式:

import asyncio
import random

async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)
        queue.task_done()   # indicate complete task

async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await asyncio.sleep(1)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))


async def main():
    queue = asyncio.Queue()

    producer_1 = producer(queue, 'producer_1')
    producer_2 = producer(queue, 'producer_2')

    consumer_1 = asyncio.ensure_future(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.ensure_future(consumer(queue, 'consumer_2'))

    await asyncio.gather(*[producer_1, producer_2], return_exceptions=True)
    await queue.join()  # wait until the consumer has processed all items
    consumer_1.cancel()
    consumer_2.cancel()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() 

输出结果:
producer_1 put a val: 7
producer_2 put a val: 2
consumer_1 get a val: 7
consumer_2 get a val: 2
producer_1 put a val: 9
producer_2 put a val: 2
consumer_1 get a val: 9
consumer_2 get a val: 2
producer_1 put a val: 9
producer_2 put a val: 3
consumer_1 get a val: 9
consumer_2 get a val: 3
producer_1 put a val: 1
producer_2 put a val: 6
consumer_1 get a val: 1
consumer_2 get a val: 6
producer_1 put a val: 2
producer_2 put a val: 2
consumer_1 get a val: 2
consumer_2 get a val: 2

1

我选择这种布局是因为它非常简洁、简单,易于理解asyncio代码:

import asyncio


async def producer(queue):
    i = 1
    while True:
        await queue.put(f'item {i}')
        i += 1
        await asyncio.sleep(5)


async def consumer(queue):
    while True:
        item = await queue.get()
        print(item)


if __name__ == '__main__':
    queue = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.create_task(producer(queue))
    loop.create_task(consumer(queue))
    loop.run_forever()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接