Kafka Streams:将1个流拆分为多个流

3
我可以帮助您进行翻译。以下是需要翻译的内容:

我有一个项目需要学习使用Kafka Streams,但我遇到了很大的麻烦。我正在使用kafka-streams版本1.0.1。 我们有一个主题流,其中包含以下样式的消息:

{
    "phenomenonTime" : "2017-04-03T16:08:19.000Z",
    "resultTime" : "2017-04-03T16:08:19.000Z",
    "result" : {
      "Temperature" : 0,
      "Pressure" : 0,
      "Humidity" : 0,
      "Mean altitude" : 0,
      "Mass PM2.5" : 7.4,
      "Mass Error PM2.5" : 1.5,
      "Mass PM10" : 12.3,
      "Mass Error PM10" : 1.5
    }
  }

这是一个Json格式,我的第一个问题是,我不知道如何正确使用Json反序列化器或序列化器。
但我的主要目标是从主题的结果字段中创建温度、压力、湿度、平均高度等话题,并在温度话题中使用正确的值。
我该如何通过Kafka Streams实现这一点?希望你能帮助我更好地开始使用Kafka Streams。
编辑:
完整消息+键(格式化)
    Key c45e9532-9810-11e8-8839-03e1e3365152
    Value { "phenomenonTime" : "2017-04-03T16:08:09.000Z",
 "resultTime" : "2017-04-03T16:08:09.000Z",
 "result" : { "Temperature" : 0,
 "Pressure" : 0, 
"Humidity" : 0, 
"Mean altitude" : 0, 
"Mass PM2.5" : 7.1,
 "Mass Error PM2.5" : 1.5,
 "Mass PM10" : 9.6, "Mass Error PM10" : 1.5 },
 "Datastream@iot.navigationLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')/Datastream", 
"Datastream" : { "unitOfMeasurement" : { "name" : null, "symbol" : null, "definition" : null }, "@iot.id" : "geo.uni-augsburg.de/Fixed-Wing-UAV-1/Datastreams/LOAC_LOCAL_201704031605.mass" }, 
"FeatureOfInterest@iot.navigationLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')/FeatureOfInterest",
 "FeatureOfInterest" : { "@iot.id" : "c458a1a4-9810-11e8-8839-176a6dbe6951" }, "@iot.id" : "c45e9532-9810-11e8-8839-03e1e3365152", "@iot.selfLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')" }

未格式化:

Key c45e9532-9810-11e8-8839-03e1e3365152
Value { "phenomenonTime" : "2017-04-03T16:08:09.000Z", "resultTime" : "2017-04-03T16:08:09.000Z", "result" : { "Temperature" : 0, "Pressure" : 0, "Humidity" : 0, "Mean altitude" : 0, "Mass PM2.5" : 7.1, "Mass Error PM2.5" : 1.5, "Mass PM10" : 9.6, "Mass Error PM10" : 1.5 }, "Datastream@iot.navigationLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')/Datastream", "Datastream" : { "unitOfMeasurement" : { "name" : null, "symbol" : null, "definition" : null }, "@iot.id" : "geo.uni-augsburg.de/Fixed-Wing-UAV-1/Datastreams/LOAC_LOCAL_201704031605.mass" }, "FeatureOfInterest@iot.navigationLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')/FeatureOfInterest", "FeatureOfInterest" : { "@iot.id" : "c458a1a4-9810-11e8-8839-176a6dbe6951" }, "@iot.id" : "c45e9532-9810-11e8-8839-03e1e3365152", "@iot.selfLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')" }

但是Datastream@iot.navigationLink、Datastream等并不重要。关键是密钥必须相同。

它看起来就像这样 (https://i.imgur.com/zvwf3g7.png)

完整导出的数据流:

https://pastebin.com/PUfhL8fK

示例 Kafka 客户端:

https://pastebin.com/y4k7fQgz

1个回答

1
为此,您需要为每个所需的目标主题创建多个KStream对象。要从主JSON中提取所需字段,请在kStream上使用mapValues方法。 为了简化与JSON值的操作,您可以使用spring-kafka库(groupId:org.springframework.kafka,artifactId:spring-kafka)中的JsonSerde
温度和压力主题的示例(并对每个所需的目标主题执行相同操作):
Map<String, String> streamProperties = new HashMap<>();
streamProperties.put("bootstrap.servers", "localhost:9092");
streamProperties.put("key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
streamProperties.put("value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");

Map<String, String> streamProperties1 = new HashMap<>(streamProperties);
streamProperties1.put("application.id", "temperature");
Map<String, String> streamProperties2 = new HashMap<>(streamProperties);
streamProperties2.put("application.id", "pressure");

Class<Map<String, Object>> genericMapClass = (Class) Map.class;
Consumed<String, Map<String, Object>> consumed = Consumed.with(Serdes.String(), new JsonSerde<>(genericMapClass));
Produced<String, Map<String, Object>> produced = Produced.with(Serdes.String(), new JsonSerde<>(genericMapClass));

StreamsBuilder streamBuilder1 = new StreamsBuilder();
KStream<String, Map<String, Object>> temperatureKStream = streamBuilder1.stream("mainSourceTopic", consumed);
temperatureKStream.mapValues((generalDetails) -> {
    Object temperatureValue = ((Map) generalDetails.get("result")).get("Temperature");
    Map<String, Object> temperatureMessageDetails = new HashMap<>();
    temperatureMessageDetails.put("Temperature", temperatureValue);
    temperatureMessageDetails.put("phenomenonTime", generalDetails.get("phenomenonTime"));
    temperatureMessageDetails.put("resultTime", generalDetails.get("resultTime"));
    System.out.println("temperatureMessageDetails: " + temperatureMessageDetails);
    return temperatureMessageDetails;
}).to("temperatureTopic", produced);

StreamsBuilder streamBuilder2 = new StreamsBuilder();
KStream<String, Map<String, Object>> pressureKStream = streamBuilder2.stream("mainSourceTopic", consumed);
pressureKStream.mapValues((generalDetails) -> {
    Object pressureValue = ((Map) generalDetails.get("result")).get("Pressure");
    Map<String, Object> pressureMessageDetails = new HashMap<>();
    pressureMessageDetails.put("Pressure", pressureValue);
    pressureMessageDetails.put("phenomenonTime", generalDetails.get("phenomenonTime"));
    pressureMessageDetails.put("resultTime", generalDetails.get("resultTime"));
    System.out.println("pressureMessageDetails: " + pressureMessageDetails);
    return pressureMessageDetails;
}).to("pressureTopic", produced);

StreamsConfig streamsConfig1 = new StreamsConfig(streamProperties1);
KafkaStreams kafkaStreams1 = new KafkaStreams(streamBuilder1.build(), streamsConfig1);
kafkaStreams1.start();

StreamsConfig streamsConfig2 = new StreamsConfig(streamProperties2);
KafkaStreams kafkaStreams2 = new KafkaStreams(streamBuilder2.build(), streamsConfig2);
kafkaStreams2.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    kafkaStreams1.close();
    kafkaStreams2.close();
}));

同时添加 streamProperties1.put("default.deserialization.exception.handler", "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"); 以跳过反序列化失败的消息(至少用于测试目的)。 - Vasyl Sarzhynskyi
回到过去,我曾经遇到同样的问题,我有一个Kafka客户端并使用普通的JSON解析器,我必须删除第一个{。 字符串结果=记录值(); int i = result.indexOf("{"); result = result.substring(i);如何在Kafka Streams中实现这个功能? - Joe Daniel
我也添加了我的Kafka客户端示例,其中我还使用了相同数据的Json。 - Joe Daniel
这是一个简单的System.out.print输出值。它的格式正确吗? - Joe Daniel
我刚刚执行了以下代码:replaceAll("\n", "").replaceAll("\s+", "");然后尝试手动解析JSON,使用JSONObject obj = (JSONObject) new JSONParser().parse(value.toString());但总是收到错误消息"Unexpected character ( ) at position 0." - Joe Daniel
显示剩余14条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接