我不确定我的Flink应用程序是否需要水印。什么时候需要水印? 如果我不需要它们,WatermarkStrategy.noWatermarks()有什么用途?
我正在按照Flink的快速入门示例监控维基百科编辑流进行操作。 这个示例是用Java编写的,而我正在使用Scala进行实现,如下: /** * Wikipedia Edit Monitoring */ object WikipediaEditMonitoring { def mai...
在我们的项目中,有一个 Flink (1.1.3) 流处理作业,它从一个 Kafka 队列读取数据,进行映射函数转换并写入另一个队列。但是在我们引入流程的一部分输出 REST 请求之后,发现出现了问题。为了解决这个问题,我们使用了 PlayFramework 的 WSClient (因为在我们...
我正在使用最新的Flink-1.1.2-Hadoop-27和flink-connector-kafka-0.10.2-hadoop1 jar包。 Flink消费者代码如下: StreamExecutionEnvironment env=StreamExecutionEnvironment....
我有以下格式的数据, SIP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|INVITE RTP|2405463430|4115474257|8.205142580136622E12|2016年11...
我目前在思考如何处理Apache Flink流应用程序中的应用程序错误。一般来说,我看到两种情况: 1.瞬态错误,您希望重新播放输入数据,并且第二次尝试可能会成功处理。例如,对外部服务的依赖暂时不可用。 2.永久性错误,重复处理仍将失败;例如无效的输入数据。 对于第一种情况,似乎常见的解...
我想在kubernetes上运行一个flink作业,并使用(持久性)状态后端,如果任务管理器崩溃,似乎没有问题,因为它们可以询问作业管理器需要从哪个检查点进行恢复,如果我理解正确的话。 似乎崩溃的作业管理器要更加困难。在flip-6页面上我看到需要zookeeper才能知道作业管理器需要使用...
我有一个Flink工作任务,其中我正在从文件夹中读取文件并将其转储到数据库中。每天会有新的文件放入该文件夹中。 我启用了检查点,以便如果由于任何原因Flink任务停止并且我需要重新启动,则Flink任务不应读取已经读取的文件。 我在我的代码中添加了下面的行,但是当我重新启动我的任务时,Fl...
我们在Kubernetes上运行一个5节点的Flink集群(版本为1.6.3),使用了5个分区的Kafka主题作为数据源。有5个作业从该主题中读取数据(使用不同的消费者组),每个作业的并行度都为5。 每个任务管理器占用10GB的内存,任务管理器堆大小被限制为2GB。摄入负载较小(每秒100-...
阅读了 Apache Flink 的几个文档页面(官方文档,dataArtisans),以及 官方存储库 中提供的示例后,我看到他们经常使用已下载的文件作为流数据源,并始终连接到本地主机。 我正在尝试使用 Apache Flink 下载包含动态数据的 JSON 文件。我的意图是尝试将可以访问...