Apache Flink中shuffle()和rebalance()的区别

8
我正在撰写我的学士学位论文,主题是比较Apache Spark Streaming和Apache Flink(仅流式处理),我刚刚看到了Flink文档中的“物理分区”部分。问题在于该文档没有很好地解释这两个转换如何工作。直接从文档中摘录如下:
- shuffle(): 根据均匀分布将元素随机分区。 - rebalance(): 轮询方式对元素进行分区,创建每个分区的平等负载。在存在数据倾斜的情况下,有助于性能优化。
来源:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#physical-partitioning 两者都是自动完成的,所以我理解它们都会平等地重新分配数据(shuffle() > 均匀分布和 rebalance() > 轮询),并随机地重新排列数据。然后我推断出,rebalance() 可以更好地分配数据(“每个分区负载均衡”),因此任务必须处理相同数量的数据,但shuffle()可能会创建更大或更小的分区。那么,在哪些情况下你可能更喜欢使用 shuffle() 而不是 rebalance()?我能想到的唯一一件事就是,可能 rebalance() 需要一些处理时间,因此在某些情况下,它可能需要更多的时间来进行重新平衡,而这可能比未来的转换所需的时间更长。我一直在寻找这个问题的答案,但没有人谈论过这个问题,只有 Flink 的邮件列表中提到过,但他们没有解释 shuffle() 的工作原理。

感谢Sneftel帮助我改进了我的问题,让我重新思考我想要问什么;以及Till很好地回答了我的问题。:D


你为什么觉得“rebalance”是随机的? - Sneftel
它不是随机的,它试图以平衡的方式放置元素,以使处理更有效率,但如果这是唯一的区别,那么我认为“shuffle()”存在的理由并不充分,因为“rebalance()”以更有效率的方式执行相同的操作。 - froblesmartin
1
你为什么认为rebalance()更有效率呢?它们只是不同的方法。shuffle()是一种随机负载平衡方法,而rebalance()则是一种显式的贪心方法。 - Sneftel
从文档中我了解到,shuffle() 方法以随机和均匀的方式分配元素,因此可能不会创建相同的负载分区,而 rebalance() 方法则尝试创建所有具有相同负载的分区。然后我推断出,rebalance() 方法以更高效的方式进行工作分配,因为所有任务管理器将处理大致相同的数据。那么如果 rebalance() 方法可以更好地完成同样的工作,为什么还要使用 shuffle() 方法呢?在某些情况下,rebalance() 所需的处理可能会产生比它可以改进的更多的延迟吗?谢谢 :) - froblesmartin
2个回答

15
如文档所述,shuffle会随机分发数据,而rebalance则会以轮询方式分发数据。后者更有效率,因为你不必计算一个随机数。此外,根据随机性,可能会导致某些不太均匀的分布。
另一方面,rebalance总是从第一个元素开始发送到第一个通道。因此,如果元素很少(少于子任务数量),那么只有一些子任务将接收元素,因为你总是从第一个子任务开始发送第一个元素。在流式处理中,这通常不会成为问题,因为通常有一个无限输入流。
实际上,两种方法同时存在的原因是历史原因。shuffle首先被引入。为了使批处理和流处理API更加相似,rebalance随后被引入。

今天的rebalance使用随机通道进行启动:https://github.com/apache/flink/blob/3466c228926f03bc44236d4634a6656f63dd2011/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java#L42 - Oliv

2
这句话是Flink误导人的:

在数据倾斜的情况下,有助于性能优化。

因为它用来描述“重新平衡(rebalance)”,而不是“洗牌(shuffle)”,这暗示了它是区分因素。我的理解是,如果一些项目处理速度较慢,而其他一些项目处理速度较快,则分区器将使用下一个空闲通道将项目发送出去。但实际上并非如此,请比较rebalanceshuffle的代码。rebalance只是将项目添加到下一个通道,而不管它有多忙。
// rebalance
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;

// shuffle
nextChannelToSendTo = random.nextInt(numberOfChannels);

这个语句也可以有不同的理解方式: "load" 不是实际处理时间,而是指项目数量。如果您原始的分区存在偏斜(分区中的项目数量差异很大),则该操作将会将项目均匀地分配到分区中。但在这种情况下,它适用于两种操作。
我的结论:`shuffle` 和 `rebalance` 做的事情是一样的,但 `rebalance` 更有效率一些。但差别非常小,以至于您可能注意不到。在我的机器上,`java.util.Random` 可以在单线程上生成 70m 个随机数。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接