雷如何向工作节点共享数据?

30

有许多简单的教程,以及Stack Overflow上的问题和答案声称Ray会以某种方式与worker共享数据,但这些都没有详细说明在哪个操作系统上会共享哪些内容。

例如,在这个SO答案中:https://stackoverflow.com/a/56287012/1382437 ,一个np数组被序列化到共享对象存储中,然后由多个worker使用,所有worker都可以访问相同的数据(代码从该答案复制):

import numpy as np
import ray

ray.init()

@ray.remote
def worker_func(data, i):
    # Do work. This function will have read-only access to
    # the data array.
    return 0

data = np.zeros(10**7)
# Store the large array in shared memory once so that it can be accessed
# by the worker tasks without creating copies.
data_id = ray.put(data)

# Run worker_func 10 times in parallel. This will not create any copies
# of the array. The tasks will run in separate processes.
result_ids = []
for i in range(10):
    result_ids.append(worker_func.remote(data_id, i))

# Get the results.
results = ray.get(result_ids)

ray.put(data)调用将数据的序列化表示放入共享对象存储,并返回其句柄/ID。

当调用worker_func.remote(data_id, i)时,worker_func将传递反序列化的数据。

但是在两者之间发生了什么?显然,data_id用于定位序列化版本的数据并对其进行反序列化。

Q1:当数据被“反序列化”时,是否总是会创建原始数据的副本?我认为是,但不确定。

一旦数据已被反序列化,它就会传递给工作进程。现在,如果需要将相同的数据传递给另一个工作进程,则有两种可能性:

Q2:当已经被反序列化的对象传递给工作进程时,它会通过另一个副本还是完全相同的对象进行传递?如果是完全相同的对象,这是使用标准的共享内存方法在进程之间共享数据吗?在Linux上,这意味着写时复制,因此这是否意味着只要对象被写入,就会创建另一个副本?

Q3:一些教程/答案似乎表明,反序列化和在工作进程之间共享数据的开销在很大程度上取决于数据类型(Numpy与非Numpy),因此细节是什么?为什么numpy数据可以更有效地共享,并且当客户端尝试写入该numpy数组时是否仍然有效(我认为始终会为该进程创建本地副本?)

2个回答

5
这是一个非常好的问题,也是 Ray 的一个很酷的功能。Ray 提供了一种在分布式环境中调度函数的方式,同时还提供了一个集群存储器来管理这些任务之间的数据共享。
以下是 Ray 可以处理的对象类型:
  • 使用 ray.put 添加的对象
  • function.remote 的结果
  • Ray actor(在 Ray 集群中实例化的远程类)
对于所有这些替代方案,对象都由 Ray 对象存储器管理 - 在某些文档中也称为 Plasma(请参见 Ray 文档中的内存管理Ray 架构白皮书中的对象管理)。
在具有多个节点并且每个节点运行多个进程的 Ray 集群中,Ray 可以将对象存储在任何一个位置。
  • 正在运行的进程的本地内存空间
  • 单个节点中所有进程共享的内存空间
  • (仅在需要回收内存时)持久存储/硬盘

例如,当您在Ray中远程调用函数时,Ray需要管理该函数的结果。有两种选择:

  • 如果序列化结果很小,则Ray将直接将其发送回调用者,并将其存储在调用者的本地内存空间中。(请参见下图左侧,在所有者进程中存储结果)
  • 如果序列化结果很大,则Ray将其存储在执行该函数的节点的共享内存中。(请参见下图右侧,在本地节点的共享内存对象存储中存储结果)。

ray example

一般来说,Ray旨在使这些细节对用户透明化。只要您使用适当的Ray API,Ray将按预期运行,并负责管理存储在集群对象存储中的所有对象。

现在回答您的问题:

Q1:数据何时进行序列化/反序列化?

  • 这取决于数据是否需要通过网络传输或者溢出到磁盘。如果数据不需要传输到网络上,也不需要溢出到磁盘上,Ray 将尽量避免对其进行序列化/反序列化,因为这样做会带来成本。例如,在共享内存中的对象不需要进行序列化/反序列化,因为可以直接由访问该内存的进程进行解引用。

Q2:当已经进行过反序列化的对象传递给工作进程时,它是通过另一个副本还是相同的对象传递的?

  • Ray 对象存储中的对象是不可变的(除了 Actor,它是一种特殊类型的对象)。当 Ray 与另一个工作进程共享对象时,它这样做是因为它知道该对象不会更改(相反,Actor 总是保存在单个工作进程中,无法复制到多个工作进程)。

  • 简而言之:您无法修改 Ray 对象存储中的对象。如果您想要更新对象的版本,则需要创建一个新对象。

问题3: 一些教程/答案似乎表明,反序列化和在工作进程之间共享数据的开销因数据类型而异(Numpy与非Numpy),那么具体情况是什么?

  • 某些数据被设计为在内存中与序列化格式非常相似。例如,Arrow对象只需要被“转换”为字节流,并且在不执行任何特殊计算的情况下进行共享。Numpy数据也以C数组的形式布局在内存中,可以简单地将其“转换”为字节缓冲区(另一方面,Python列表是一个引用数组,需要对每个引用的对象进行序列化)

  • 其他类型的数据需要更多计算才能进行序列化。例如,如果您需要序列化一个Python函数以及其闭包,则可能会非常慢。考虑下面的函数:要序列化它,您需要序列化函数,但还需要序列化它从封闭上下文中访问的所有变量(例如 MAX_ELEMENTS)。

MAX_ELEMENTS = 10
def batch_elements(input):
  arr = []
  for elm in input:
    arr.append(elm)
    if len(arr) > MAX_ELEMENTS:
      yield arr
      arr = []

  if arr:
    yield arr

我希望这有所帮助-我很乐意深入探讨。

我想澄清一下,如果集群只使用一台机器,就不会有串行化的问题吗? - undefined

1

Ray正在内部运行一个redis服务器,以便在进程间共享数据。

如果您想了解更多信息,redis会在本地打开一个端口来获取/放置数据,并与多个进程通信。所有数据基本上都必须是“字符串”或“字符串列表”。因此,ray还实现了从redis序列化/反序列化的功能。


谢谢!您能详细解释一下序列化是如何完成的,以及数据何时被复制吗?请参见原问题中的Q1、Q2、Q3。我发现这篇文章https://arrow.apache.org/blog/2017/10/15/fast-python-serialization-with-ray-and-arrow/解释了一些基础知识,但没有解释arrow如何与redis交互,以及何时可以使用零拷贝读取/共享内存。 - Alex I
Q1,是的,它会创建一个副本(序列化为字符串)。例如,之前你有ndarray[1, 1, 1],现在你有了"ndarray, [1, 1, 1]"字符串。这个字符串被存储到Redis服务器(独立)中。Q2,如果你所说的“完全相同的对象”是指指向相同内存地址的指针,那么答案是否定的,工作进程没有获取到相同的内存地址。相反,工作进程获取到了字符串"ndarray, [1, 1, 1]",然后将其反转(反序列化)成一个新的ndarray [1, 1, 1]。Q3不清楚。 - Ben L
谢谢。我认为这不是完全正确的,请参考我发布的链接:“Arrow支持零拷贝读取,因此对象可以自然地存储在共享内存中,并被多个进程使用”。这就是为什么我想要一个详细的阐明。 - Alex I
如果你要在多台机器上分发应用程序,则无法进行零拷贝读取:数据始终必须通过网络连接发送并存储在接收机器上。否则,你会期望进行破坏性传送吗?:) 最好的方法是使用像OpenMPI的“窗口”这样的东西,它是另一台机器内存中的缓冲区,其他机器可以单方面异步地访问网络。计算机体系结构不支持任何类型的直接跨机器内存访问。 - Robin De Schepper
2
实际上,这并不完全正确。Redis仅用于元数据存储,并且与应用程序数据相比容量相对较小。此外,Redis现在正在逐步退出Ray,并且不再需要运行Ray集群!请参阅此链接以获取更多信息:https://www.anyscale.com/blog/redis-in-ray-past-and-future - Stephanie Wang
Redis不再作为默认选项使用。现在使用的是GCS的实现。 - mike01010

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接