Kafka -> Flink DataStream -> MongoDB

11

我想设置Flink,以便将来自Apache Kafka的数据流转换并重定向到MongoDB。为了测试目的,我正在基于flink-streaming-connectors.kafka示例进行构建(https://github.com/apache/flink)。

Kafka流被Flink正确地读取,我可以对它们进行映射等操作,但是当我想将每个接收到和转换后的消息保存到MongoDB时,问题就出现了。我在github上找到了关于MongoDB集成的唯一示例是flink-mongodb-test。不幸的是,它使用静态数据源(数据库),而不是数据流。

我相信应该有一些DataStream.addSink实现用于MongoDB,但显然没有。

如何才能最好地实现它?我需要编写自定义sink函数吗,还是我漏掉了什么?也许应该用不同的方法来完成?

我没有绑定任何解决方案,所以欢迎任何建议。

下面是一个示例,说明我输入的内容以及我需要存储的输出内容。

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>

Flink: DataStream.map({
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection

正如你在这个例子中看到的,我主要使用Flink来缓冲Kafka的消息流和进行一些基本的解析。

2个回答

4
作为对Robert Metzger答案的替代方案,您可以将结果再次写入Kafka,然后使用维护的Kafka连接器之一将主题内容放入MongoDB数据库中。
Kafka -> Flink -> Kafka -> Mongo/Anything
通过这种方法,您可以保持“至少一次语义”行为。

3

目前Flink中没有流式MongoDB Sink可用。

但是,有两种方法可以将数据写入MongoDB:

  • 使用Flink的DataStream.write()调用。它允许您在流式处理中使用任何OutputFormat(来自批处理API)。使用Flink的HadoopOutputFormatWrapper,您可以使用官方的MongoDB Hadoop连接器。

  • 自己实现Sink。使用流式API实现Sink非常容易,我相信MongoDB有一个很好的Java客户端库。

这两种方法都不提供任何复杂的处理保证。但是,当您使用启用了检查点的Kafka和Flink时,您将具有至少一次语义:在错误情况下,数据会再次流向MongoDB Sink。如果您正在进行幂等更新,则重新执行这些更新不应导致任何不一致性。

如果您真的需要MongoDB的精确一次语义,您可能应该在Flink中提交一个JIRA并与社区讨论如何实现此功能。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接