如何使用Python Ray并行处理大量数据而不占用内存?

5

我在考虑使用 Ray 来实现数据的并行处理:

  • 有大量的数据需要通过流/迭代器进行处理。每个数据项都具有相当大的大小。
  • 每个数据项都需要运行一个函数,该函数将产生相应的结果。
  • 处理后的数据应通过流进行传递或存储在某种仅能在一定时间内接受一定数量数据的汇聚处。

我想知道是否可以用 Ray 实现这个过程。

目前,我已经基于 Python 多进程库创建了以下简单实现:

  • 一个进程读取流并将其传递给一个队列,在 k 个数据项之后阻塞该队列(以使队列所需的内存不超过某个限制)。
  • 有多个工作进程从输入队列中读取并处理数据项。处理后的数据传递到另一个大小有限的结果队列。
  • 另一个进程从结果队列中读取数据项。

在此基础上,一旦工作进程无法再处理任何数据项,队列就会阻止,不会再向工作进程传递更多任务。 如果汇聚进程无法再存储更多数据项,则结果队列将被阻塞,这将进而阻塞工作进程,因此将阻塞输入队列,直到编写进程可以再次编写结果。

那么 Ray 是否有抽象层来完成类似的操作?我该如何确保只传递了一定量的任务给工作进程,并且如何实现单进程输出函数以确保工作进程不能通过太多的结果使内存/存储用尽?

2个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
4
有一个Ray的实验性流式处理API可能是有用的:https://github.com/ray-project/ray/tree/master/python/ray/experimental/streaming 。该API提供了基本的数据源、自定义操作符和输出目标。通过设置队列大小的上限,您还可以为应用程序设置最大内存占用量。 您能否分享一些关于您的应用程序的额外信息? 我们正在谈论什么类型的数据?单个数据项的大小是多少字节?

虽然这回答了问题,但底部的几行最好留作评论。一旦您获得足够的声望,您将能够在其他用户的帖子上留下评论,以向提问者寻求澄清。 - Hoppeduppeanut

0

对于这种用例,我建议使用 Ray 的 并行迭代器。首先,您需要创建一个生成器,该生成器从流式生成器中获取大型对象(请参见ray.util.iter.from_iterators()),并在这些项上链接操作(请参见.for_each())。关键是,中间对象(它们本身可以很大)在被链中的下一个函数消耗后立即从内存中清除,防止您耗尽内存。

最后,您可以使用 .take() 方法控制队列上的执行,直到您的数据汇准备就绪。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,