Apache Flink:分阶段执行

3
由于我想执行针对Flink编写的Scala程序的性能测量,我希望您可以逐步执行它。
execute first operator; materialize result;
execute second operator; materialize result;
...

等等。原始代码:

var filename = new String("<filename>")
var text = env.readTextFile(filename)
var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts.writeAsText("file://result.txt", WriteMode.OVERWRITE)
env.execute()

我希望 var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1) 的执行可以逐步进行。

每个运算符后都调用 env.execute() 是正确的做法吗?

还是在每个操作后写入到 /dev/null,即调用 counts.writeAsText("file:///home/username/dev/null", WriteMode.OVERWRITE),然后调用 env.execute() 更好一些?Flink 是否真正有类似于 NullSink 的东西来达到这个目的?

编辑:我在集群上使用 Flink Scala Shell,并将应用程序设置为使用 parallelism=1 执行以上代码。

1个回答

3

Flink默认使用流水线数据传输来提高作业执行性能。但是,您也可以通过调用强制进行批量数据传输。

ExecutionEnvironment env = ...
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);

这将分离两个操作的执行(除非它们被链接)。您可以从日志文件或检查Web仪表板中获取每个任务的执行时间。请注意,对于链接的操作符,即具有相同并行性且不需要网络混洗的操作符,这将无效。此外,您应该意识到使用批处理传输会增加程序的总体执行时间。我认为在流水线数据处理器中真正分离运算符的执行时间是不可能的。
在每个操作符之后调用execute()是行不通的,因为Flink尚不支持内存中结果的缓存。因此,如果您执行操作符2,则需要将操作符1的结果写入某个持久存储并再次读取它,或者重新执行操作符1。

我将flink与单线程执行进行比较,因此我总是使用parallelism=1调用我的应用程序。那么是否有可能将执行分离? - lary
单线程执行是否会影响操作符的链接? - lary
链接是一种优化技术,它总是应用于DataSet(批处理)程序中。只有在以下情况下才会发生链接:1)两个运算符具有相同的并行性,2)第二个运算符不需要分区,3)第一个运算符只有一个后继。例如,具有相同并行性的两个映射运算符将始终被链接。链接基本上意味着数据不会被序列化传输,而是立即转发到下一个函数中的结果对象。 - Fabian Hueske
那么在我的应用程序中怎么样呢?为了计算单词数,我使用了一个 map 和一个 reduce(包括 groupBy 和 sum)。但是我仍然有三个运算符,对吧?并且在 map 之后需要进行 reduce 的分区。所以它不会在 map 之后被链接起来吗?但是默认情况下,所有运算符的并行度都是 1,对吧?那么链接起来吗?我还没有完全理解... - lary
对于链式调用,所有条件都需要满足。所以在你的情况下,映射器和减少器不会被链接在一起,因为Reduce需要一个shuffle操作。然而,Flink会注入一个Combiner,并将其链接到mapper上。 - Fabian Hueske

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接