异步IO和concurrent.futures.ThreadPoolExecutor

5

我正在构建一个网络爬虫API,大部分的爬取工作都是使用AsyncIO协程完成的,例如:

async def parse_event(self):
   do scraping

# call the func
asyncio.run(b.parse_event())

这很完美地运作,但由于我需要同时从多个网站抓取数据,所以一开始我使用了concurrent.futures.ThreadPoolExecutor 进行多线程爬虫

但是自从我实现了协程逻辑后,我现在不能直接在我的线程中使用asyncio.run方法。

之前(没有协程):

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
     w1_future = executor.submit(self.w1.parse_event)
     w2_future = executor.submit(self.w2.parse_event)
     w3_future = executor.submit(self.w3.parse_event)

然后,我期望看到以下类似的内容:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
     w1_future = executor.submit(asyncio.run(self.w1.parse_event))
     w2_future = executor.submit(asyncio.run(self.w2.parse_event))
     w3_future = executor.submit(asyncio.run(self.w3.parse_event))

很不幸,它没有起作用。


如果你真的想要的话,应该使用executor.submit(asyncio.run, self.w1.parse_event) - MisterMiyagi
@MisterMiyagi asyncio.run 是否保证线程安全?由于文档中没有提到,我认为它不是线程安全的,因此不安全。 - freakish
我已重新打开这个问题。将带参数的 asyncio.run 传递给执行程序是不安全的。 - freakish
@freakish “asyncio.run”本身并不具备任何并发性,因此它的线程安全性与“+”一样。操作数是决定操作是否线程安全的关键。 - MisterMiyagi
@freakish 这不是猜测,而是长期使用“asyncio”。文档并没有说“asyncio.run”是线程安全的,就像大多数事情的文档也没有谈论线程安全性一样。 - MisterMiyagi
显示剩余9条评论
1个回答

13

asyncio和线程都是使用单个核心进行并发操作的一种方式。但是它们使用不同的机制: asyncio使用async/await协作式并发,而线程使用GIL抢占式并发。
混合使用两者不能加快执行速度,因为两者仍然使用同一个单核心; 相反,两种机制的开销会减慢程序的运行速度,并且这两种机制之间的交互会使编写正确代码变得复杂。

要实现多个任务之间的并发,请将它们全部提交到单个asyncio事件循环中。相当于executor.submit的是asyncio.create_task; 多个任务可以同时提交,使用asyncio.gather。请注意,它们都在循环内部被调用,而不是在执行器外部。

async def parse_all():
    return await asyncio.gather(
        # all the parsing tasks that should run concurrently
        self.w1.parse_event,
        self.w2.parse_event,
        self.w3.parse_event,
    )

asyncio.run(parse_all())

如果您确实想为每个解析使用单独的线程事件循环,那么您必须使用executor.submit(func, *args)而不是executor.submit(func(args))

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
     w1_future = executor.submit(asyncio.run, self.w1.parse_event())
     w2_future = executor.submit(asyncio.run, self.w2.parse_event())
     w3_future = executor.submit(asyncio.run, self.w3.parse_event())

请注意混合使用asyncio和线程会增加复杂性和约束。您可能希望使用调试模式来检测一些线程和上下文安全问题,但是往往没有文档记录线程和上下文安全的约束和保证;如果需要,请手动测试或检查操作是否安全。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接