7得票2回答
Flink - 添加仪表板

我希望为我的flink作业添加NewRelic工具。我没找到在bin/flink run <job>命令中传递额外的类路径/其他参数的方法。 NewRelic Java代理需要将-javaagent:<path to jar>添加到执行路径中。建议还要传入配置文件路径...

8得票11回答
在Flink 1.11.1中找不到可执行应用程序的ExecutorFactory。

首先,我阅读了此帖子,关于同样的问题(链接),并尝试按照相同的解决方案进行操作(使用mvn创建一个新的快速入门项目并将代码迁移到其中),但是即使在IntelliJ的开箱即用情况下也无法正常工作。 这是我的pom.xml与另一个pom.xml中的依赖项混合在一起。我做错了什么? <!-...

8得票2回答
如何在Flink中支持多个KeyBy

在下面的代码示例中,我正在尝试获取员工记录的流{ Country, Employer, Name, Salary, Age }并转储每个国家最高薪水的员工。不幸的是,Multiple KEY By无法使用。 仅KeyBy(Employer)反映出来,因此我没有得到正确的结果。我错过了什么? ...

8得票1回答
Apache Flink: 状态何时进行序列化/反序列化?

Flink在何时进行操作状态的序列化/反序列化?是在每次get/update时还是基于检查点?状态后端是否有影响? 我怀疑,在具有多样化键(数百万)和每个键每秒数千个事件的键控流情况下,序列化/反序列化可能是一个大问题。我对吗?

10得票1回答
Flink作业未分布到多台机器上。

我在 Apache Flink 中有一个小的使用案例,它是一个批处理系统。我需要处理一个文件集合,每个文件的处理必须由一台机器处理。我有下面的代码。始终只有一个任务插槽被占用,并且文件是一个接一个地处理。我有6个节点(因此有6个任务管理器),每个节点配置了4个任务插槽。所以,我期望每次处理24...

19得票1回答
如何根据数据将一个数据流输出到不同的输出端口?

在Apache Flink中,我有一个元组流。假设是一个非常简单的Tuple1<String>。 元组的值字段可以有任意值(例如'P1'、'P2'等)。可能的取值集合是有限的,但我事先不知道完整的集合(所以可能会有'P362')。 我想根据元组内部的值将该元组写入特定的输出位置。例...

18得票5回答
Kafka客户端超时时间为60000毫秒,在确定分区位置之前已过期。

我正在尝试将Flink连接到Kafka消费者 我使用Docker Compose构建4个容器:zookeeper、kafka、Flink JobManager和Flink TaskManager。 对于zookeeper和Kafka,我使用wurstmeister镜像,而对于Flink,我...

8得票1回答
Apache Flink:设置并行性的指南?

我希望得到一些简单的规则或指导方针,以确定operator或job并行度应设置为什么值。对我来说,它似乎应该是一个小于等于可用任务槽数量的数字? 例如,假设我有两台task manager机器,每台机器上有4个任务槽。假设在集群上没有其他作业正在运行,在像filter和map这样的操作中,我...

8得票1回答
如何在Apache Flink中对GroupedDataSet进行函数的flatMap操作

我想通过flatMap对DataSet.groupBy生成的每个组应用一个函数。尝试调用flatMap时,我收到了编译器错误: error: value flatMap is not a member of org.apache.flink.api.scala.GroupedDataSet ...

26得票3回答
实践中,小批量处理和实时流处理有什么区别?

在实践中,小批量和实时流有哪些区别(不是理论上的差异)?理论上,我理解小批量是在给定时间内批处理数据,而实时流更像是随着数据到达进行操作,但我最大的问题是为什么不能在 epsilon 时间范围内使用小批量(比如说一毫秒)或者说我想了解一个方法为什么比另一个方法更有效? 我最近遇到一个例子,其...