19得票1回答
如何根据数据将一个数据流输出到不同的输出端口?

在Apache Flink中,我有一个元组流。假设是一个非常简单的Tuple1<String>。 元组的值字段可以有任意值(例如'P1'、'P2'等)。可能的取值集合是有限的,但我事先不知道完整的集合(所以可能会有'P362')。 我想根据元组内部的值将该元组写入特定的输出位置。例...

18得票5回答
Kafka客户端超时时间为60000毫秒,在确定分区位置之前已过期。

我正在尝试将Flink连接到Kafka消费者 我使用Docker Compose构建4个容器:zookeeper、kafka、Flink JobManager和Flink TaskManager。 对于zookeeper和Kafka,我使用wurstmeister镜像,而对于Flink,我...

18得票1回答
在Apache Flink中,如何将两个数据流合并,而不考虑窗口时间?

我有两个数据流,想要将它们合并。问题是一个数据流的频率比另一个高得多,并且有时候一个流根本不接收任何事件。是否可以使用来自一个流的最后一个事件,并在每个即将到来的事件上将其与另一个流连接起来? 唯一的解决方案是使用join函数,但必须指定一个共同的窗口,在这个窗口中才能应用join函数。当一个...

15得票5回答
在AWS Lambda中无法解析${akka.stream.materializer}的值。

我有一个Java应用程序,在其中使用了 Flink API。因此,我想要通过这段代码创建两个包含少量记录的数据集,并将它们与必要的字段一起注册为两个表。 DataSet<Company> comp = env.fromElements( new Co...

15得票1回答
如何在Apache Flink中查找和更新数据库记录的状态?

我正在开发一个数据流应用程序,并研究在该项目中使用Apache Flink的可能性。这样做的主要原因是它支持类似于Java 8的Stream API的高级流构造。 我将接收与数据库中特定记录相对应的事件,并且希望能够处理这些事件(来自消息代理,如RabbitMQ或Kafka),最终更新数据库...

15得票2回答
Apache Flink:java.lang.NoClassDefFoundError

我试图按照这个示例进行操作,但是当我尝试编译时,出现了以下错误:Error: Unable to initialize main class com.amazonaws.services.kinesisanalytics.aws Caused by: java.lang.NoClassDefF...

14得票1回答
Spark与Flink:低内存可用性问题

我已经建立了一个Spark和Flink的k-means应用程序。 我的测试案例是在一个包含3个节点的集群上对100万个点进行聚类。 当内存瓶颈开始时,Flink开始将工作外包到磁盘并变得慢,但仍能正常工作。 然而,如果内存满了,Spark会失去执行器并重新启动(无限循环?)。 我尝试通过邮...

14得票2回答
Apache Flink和Hadoop上的MapReduce相比有哪些区别?

Apache Flink相对于Hadoop上的Mapreduce有何不同?它的优势在哪里以及为什么?

14得票1回答
Apache Spark结构化流处理与Apache Flink有什么区别?

我们讨论了以下几个问题: Apache Spark和Apache Flink有什么区别?[已关闭] 在Apache Spark和Apache Flink中,“流式处理”是什么意思? 小批量处理和实时流处理在实践中(而非理论上)有什么区别? 但是,Spark Structured Str...

14得票1回答
在使用多个Kafka源时,如何正确设置Flink的并行度?

我仍然无法清晰地理解并行性,假设我们有一个拥有足够 slot 的 flink 集群。在我们的 flink 作业中,我们从三个不同的 kafka 集群消费了 3 个 kafka 主题,每个主题有 10 个分区。 如果我们想尽快消费消息,那么并行度应该设置为多少? 如果我们将并行度设置为 10...