有没有一种方法可以在.NET中消费Kafka Ksql推送查询?

6

我目前正使用.NET的Kafka消费者处理大量的Kafka消息。

在我的处理流程中,第一步是解析JSON并丢弃基于JSON中特定字段值的许多消息。

我想要做的就是在第一时间不处理(也就是不下载)那些不需要的消息。

看起来像是kSql查询 - 通过推送查询方式编写 - 可以有效地过滤掉我需要处理的消息。

但是,我该如何通过.NET进行消费呢?我看到一些文档提到了REST API,但我怀疑这不是一个好主意。因为我需要在一天中的高峰时段处理超过10万条记录。(如果我可以有选择地下载和处理消息,我只会处理当前数量的三分之一。)

不幸的是,我无法控制发布者,因此无法更改消息的发布方式或内容。

2个回答

4

您可以按以下方式使用ksqldb Linq提供程序。

使用Nuget包管理器安装软件包:

Install-Package ksqlDB.RestApi.Client

使用C# (.NET)创建查询:
var ksqlDbUrl = @"http://localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);
      
await using var context = new KSqlDBContext(contextOptions);

using var subscription = context.CreateQueryStream<Message>() //stream name
  .Where(p => p.RowTime >= 1510923225000) // add your own conditions
  //....
  .Select(l => new { l.Id, l.Message, l.RowTime })
  .Subscribe(onNext: message =>
  {
  }, onError: error => {  }, onCompleted: () => { });

上述C#代码等同于以下ksql:

SELECT Id, Message, RowTime FROM Messages WHERE RowTime >= 1510923225000 EMIT CHANGES;

项目Wiki以获取更多操作符。


所有的例子为什么似乎都是使用“http:\"而不是通常的“http://”呢? - Steve Smith

2
是的,您可以使用ksqlDB来完成这个任务。
-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT) 
  WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');

-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
  SELECT * FROM my_source WHERE COL1='FOO';

然后在您的应用程序中使用REST API运行推送查询,仅消费筛选后的消息:

SELECT * FROM target EMIT CHANGES;

除了ksqlDB之外,您可能还想看看社区最近发布的这个项目:https://github.com/LGouellec/kafka-streams-dotnet


2
@Robbin Moffatt。谢谢。我看到了你提到的流项目,但它似乎还处于早期阶段。不过,值得一试。你认为通过REST API处理大量数据是可行的吗?对我来说,这似乎有点靠不住,但我不是这方面的专家。(主要是担心如果我必须进行大量的REST调用,速度会很慢) - Workerbee

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接