8得票1回答
Flink作业突然崩溃并显示错误:在消费分区时遇到错误

我的一个流式作业运行了1天10小时后失败了。其中一个子任务突然失败,并导致整个作业崩溃。由于我设置了restart_strategy,作业自动重新启动,但再次以相同的错误崩溃。我找到了任务管理器的日志,在此期间任务失败了,但这对我进行调试并不是很有帮助。有人能提出更好的方法吗?谢谢。 故障周...

8得票4回答
Flink 1.13.2:NoResourceAvailableException

这是在亚马逊的Kinesis数据分析Flink环境中使用Flink 1.13.2运行的。 此应用程序正在运行Kafka主题。当主题具有较小的流量容量时,此应用程序可以正常运行,但当流量更大时,我会遇到此错误。如何进行故障排除、调整和修复? 我看到了一些类似的SO问题,比如这个,但那显然是较...

7得票1回答
自定义 Apache Flink 的 Prometheus 接收器?

我有一系列由Apache Flink处理的时间序列日志,我想将其通过导出到Prometheus来绘制Grafana的数据。是否有示例或java实现方法?例如编写一个自定义的Flink sink,它会不断地将数据汇入Prometheus中。

10得票1回答
Flink作业未分布到多台机器上。

我在 Apache Flink 中有一个小的使用案例,它是一个批处理系统。我需要处理一个文件集合,每个文件的处理必须由一台机器处理。我有下面的代码。始终只有一个任务插槽被占用,并且文件是一个接一个地处理。我有6个节点(因此有6个任务管理器),每个节点配置了4个任务插槽。所以,我期望每次处理24...

7得票2回答
Apache Flink DataStream API 没有 mapPartition 转换。

Spark DStream中有mapPartition API,而Flink DataStream API没有。是否有人能够帮助解释原因。我想要实现一个与Spark reduceByKey类似的API在Flink上。

7得票1回答
何时在Flink中使用transient, 何时不使用?

在这段代码中,我应该使用 transient 吗? 我什么时候可以使用 transient? 有什么区别? 需要您的帮助。 private Map<String, HermesCustomConsumer> topicSourceMap ...

15得票5回答
在AWS Lambda中无法解析${akka.stream.materializer}的值。

我有一个Java应用程序,在其中使用了 Flink API。因此,我想要通过这段代码创建两个包含少量记录的数据集,并将它们与必要的字段一起注册为两个表。 DataSet<Company> comp = env.fromElements( new Co...

7得票3回答
Apache Flink: 由于类型擦除,函数的返回类型无法自动确定

我用Java编写了一个简单的程序,使用Flink框架,可以接受文件或文本作为输入,并使用flatMap函数打印所有单词。 这是我的代码: final ParameterTool params = ParameterTool.fromArgs(args); ...

8得票1回答
Apache Flink:设置并行性的指南?

我希望得到一些简单的规则或指导方针,以确定operator或job并行度应设置为什么值。对我来说,它似乎应该是一个小于等于可用任务槽数量的数字? 例如,假设我有两台task manager机器,每台机器上有4个任务槽。假设在集群上没有其他作业正在运行,在像filter和map这样的操作中,我...

7得票1回答
如何解决:flink kafka消费者中的java.lang.OutOfMemoryError: Direct buffer memory问题

我们在Kubernetes上运行一个5节点的Flink集群(版本为1.6.3),使用了5个分区的Kafka主题作为数据源。有5个作业从该主题中读取数据(使用不同的消费者组),每个作业的并行度都为5。 每个任务管理器占用10GB的内存,任务管理器堆大小被限制为2GB。摄入负载较小(每秒100-...