操作员遇到的一些难题:Flink中的并行性问题

21

我刚刚拿到了下面这个例子用于并行处理,并有一些相关的问题:

  1. setParallelism(5) 只为 sum 设置 Parallelism 5,还是对 flatMap 和 sum 都设置了 Parallelism 5?

  2. 我们是否可以将不同的并行度分别设置给不同的操作符,例如分别设置 sum 的并行度为 5 和 flatMap 的并行度为 10。

  3. 根据我的理解,keyBy 根据不同的键将 DataStream 划分为逻辑流/分区,假设有 10,000 个不同的键值,那么就有 10,000 个不同的分区,那么有多少个线程会处理这 10,000 个分区?只有 5 个线程吗?如果我们没有设置 setParallelism(5) 会怎样?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html

final StreamExecutionEnvironment env =     
  StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
2个回答

15

当在一个操作符上调用setParallelism方法时,它会改变这个特定操作符的并行度。因此,在您的示例中,只有窗口操作符将以5的并行度执行,而前面的flatMap操作符则以默认并行度执行。

因此,您可以为每个操作符设置不同的并行度。但是,请注意,具有不同并行度的操作符无法链接并需要进行重新平衡(类似于洗牌)操作。

如果要为所有操作符设置并行度,则必须通过ExecutionEnvironment#setParallelism API调用来完成。

keyBy操作将输入流分区为与并行操作符实例一样多的分区。这确保了所有具有相同键的元素最终都在同一个分区中。因此,在您将并行度设置为5的示例中,您将得到5个分区。每个分区可以容纳具有不同键的元素。


1
如果我将 setParallelism 设置为 5,则最终会得到 5 个分区,那么如果我将 setMaxParallelism 设置为 5,那么会有多少个分区呢?这是否意味着它会拥有超过 5 个分区?另外,为什么我们既有 setParallelism 又有 setMaxParallelism?我的意思是,我们应该在什么时候使用 setParallelism,什么时候使用 setMaxParallelism? - YuFeng Shen
1
并行性定义了运算符的并行实例数量。最大并行度定义了您可以将作业扩展到的最大并行度。如果您使用保存点重新启动具有更高并行度的作业,则这一点非常重要。 - Till Rohrmann
假设我删除上面的示例中的.keyBy(0),但保留.setParallelism(5),结果仍然有5个分区,不同之处在于相同的键会以不同的分区结束。那么窗口操作符Sum的并行度是多少?即使使用setParallelism(5),它也会变为1吗?根据以下官方文档,非键控窗口的并行度将变为1。https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html,或者仍然是5,因为setParallelism(5)可以覆盖非键控窗口的并行度1吗? - YuFeng Shen
1
对于非键控窗口,即使您尝试将并行度设置为其他值,也会强制执行并行度为“1”。原因是您无法分发未键控的窗口。 - Till Rohrmann
1
很好的解释@TillRohrmann。我正要发布一个问题,询问keyBy()setParallelism()之间的区别,以及如果我在代码中没有使用keyBy但仍然设置了parallelism会发生什么,而你让我的一天变得美好。 - whatsinthename
很高兴我能帮到你 @whatsinthename :-) - Till Rohrmann

2

执行环境级别 正如在此处提到的,Flink程序是在执行环境的上下文中执行的。执行环境为所有运算符、数据源和数据汇定义了默认并行度。可以通过显式配置运算符的并行度来覆盖执行环境并行度。

可以通过调用setParallelism()方法来指定执行环境的默认并行度。要使用并行度为3执行所有运算符、数据源和数据汇,请将执行环境的默认并行度设置为以下内容:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接