163得票4回答
Flink和Storm的主要区别是什么?

Flink被与Spark相比较,但我认为这是错误的比较,因为它将一个窗口事件处理系统与微批处理进行比较;同样,将Flink与Samza进行比较对我来说也没有太多意义。在这两种情况下,它比较了实时和批处理事件处理策略,即使在Samza的情况下规模更小。但我想知道Flink与Storm相比如何,因...

30得票4回答
无法找到类型为 org.apache.flink.api.common.typeinfo.TypeInformation[...] 的证据参数的隐式值

我正在尝试编写一些关于Apache Flink的用例。我经常遇到一个错误: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInform...

21得票2回答
操作员遇到的一些难题:Flink中的并行性问题

我刚刚拿到了下面这个例子用于并行处理,并有一些相关的问题: setParallelism(5) 只为 sum 设置 Parallelism 5,还是对 flatMap 和 sum 都设置了 Parallelism 5? 我们是否可以将不同的并行度分别设置给不同的操作符,例如分别设置 sum...

20得票1回答
如何正确实现HTTP sink?

我希望将我的DataStream流的计算结果通过HTTP协议发送到其他服务。我看到有两种可能的实现方式: 在sink中使用同步的Apache HttpClient客户端 public class SyncHttpSink extends RichSinkFunction<Sessio...

20得票3回答
Flink WebUI在从IDE运行时的问题

我想在Web用户界面中查看我的工作。 我使用createLocalEnvironmentWithWebUI,在IDE中代码运行良好,但无法在http://localhost:8081/#/overview中看到我的工作。 val conf: Configuration = new ...

19得票1回答
如何根据数据将一个数据流输出到不同的输出端口?

在Apache Flink中,我有一个元组流。假设是一个非常简单的Tuple1<String>。 元组的值字段可以有任意值(例如'P1'、'P2'等)。可能的取值集合是有限的,但我事先不知道完整的集合(所以可能会有'P362')。 我想根据元组内部的值将该元组写入特定的输出位置。例...

14得票1回答
在使用多个Kafka源时,如何正确设置Flink的并行度?

我仍然无法清晰地理解并行性,假设我们有一个拥有足够 slot 的 flink 集群。在我们的 flink 作业中,我们从三个不同的 kafka 集群消费了 3 个 kafka 主题,每个主题有 10 个分区。 如果我们想尽快消费消息,那么并行度应该设置为多少? 如果我们将并行度设置为 10...

12得票1回答
Flink:流式拓扑中未定义运算符。无法执行。

我正在尝试搭建一个非常基础的flink作业。当我尝试运行时,出现以下错误:Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at...

11得票1回答
Flink:如何处理外部应用程序配置更改的问题

我的要求是每天流式传输数百万条记录,并且它在外部配置参数上有很大的依赖关系。例如,用户可以随时更改Web应用程序中所需的设置,并且在进行更改后,必须使用新的应用程序配置参数来进行流式传输。这些是应用程序级别的配置,我们还有一些需要通过每个数据传递并进行过滤的动态排除参数。 我发现Flink没...

11得票5回答
如何从程序中停止flink流处理作业?

我正在尝试为一个将数据写入kafka话题并从同一kafka话题读取数据的Flink流作业创建JUnit测试,使用FlinkKafkaProducer09和FlinkKafkaConsumer09。我在生产中传递了一个测试数据:DataStream<String> stream = ...