在Flink中,如何将DataStream写入单个文件?

7
DataStreamwriteAsTextwriteAsCsv方法会根据工作线程的数量写入相应数量的文件。据我所知,这些方法只允许您指定这些文件的路径和一些格式设置。
为了调试和测试目的,能够将所有内容打印到一个单独的文件中,而不必更改设置以使用单个工作线程,这将非常有用。
是否有任何非过于复杂的方法可以实现此功能?我怀疑可以通过实现自定义SinkFunction来实现,但对此并不确定(此外,这似乎对于看起来相对简单的事情来说也有点麻烦)。
2个回答

13

将并行度设置为1可以实现此目的。这样写操作只会在一台机器上进行。

writeAsText(path).setParallelism(1);

1
在更新的版本中已被弃用。请参考SteamingFileSink。 - Deepak
回答不错,但我不会点赞,因为它已经过时了,@eseuteo的回答是最新的。 - Cuauhtli

4
在 Flink 1.13 中,不再使用 writeAsText 函数进行此操作,因为它已被弃用。
如可在这里看到,现在应该使用 StreamingFileSink 类和 addSink 操作。关于将并行度设置为 1,这也是不同的(通过将 StreamExecutionEnvironment 的 parallelism 设置为 1,即 setParallelism 方法)。
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val sink: StreamingFileSink[String] = StreamingFileSink
  .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
  .build()

dataStream.map(_.toString).addSink(sink)

1
StreamingFileSink 在较新的版本中已被弃用,请改用 org.apache.flink.connector.file.sink.FileSink - Sambhav Khare

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接