我可以帮助您进行翻译。以下是需要翻译的内容:
这是一个Json格式,我的第一个问题是,我不知道如何正确使用Json反序列化器或序列化器。
但我的主要目标是从主题的结果字段中创建温度、压力、湿度、平均高度等话题,并在温度话题中使用正确的值。
我该如何通过Kafka Streams实现这一点?希望你能帮助我更好地开始使用Kafka Streams。
编辑:
完整消息+键(格式化)
我有一个项目需要学习使用Kafka Streams,但我遇到了很大的麻烦。我正在使用kafka-streams版本1.0.1。 我们有一个主题流,其中包含以下样式的消息:
{
"phenomenonTime" : "2017-04-03T16:08:19.000Z",
"resultTime" : "2017-04-03T16:08:19.000Z",
"result" : {
"Temperature" : 0,
"Pressure" : 0,
"Humidity" : 0,
"Mean altitude" : 0,
"Mass PM2.5" : 7.4,
"Mass Error PM2.5" : 1.5,
"Mass PM10" : 12.3,
"Mass Error PM10" : 1.5
}
}
这是一个Json格式,我的第一个问题是,我不知道如何正确使用Json反序列化器或序列化器。
但我的主要目标是从主题的结果字段中创建温度、压力、湿度、平均高度等话题,并在温度话题中使用正确的值。
我该如何通过Kafka Streams实现这一点?希望你能帮助我更好地开始使用Kafka Streams。
编辑:
完整消息+键(格式化)
Key c45e9532-9810-11e8-8839-03e1e3365152
Value { "phenomenonTime" : "2017-04-03T16:08:09.000Z",
"resultTime" : "2017-04-03T16:08:09.000Z",
"result" : { "Temperature" : 0,
"Pressure" : 0,
"Humidity" : 0,
"Mean altitude" : 0,
"Mass PM2.5" : 7.1,
"Mass Error PM2.5" : 1.5,
"Mass PM10" : 9.6, "Mass Error PM10" : 1.5 },
"Datastream@iot.navigationLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')/Datastream",
"Datastream" : { "unitOfMeasurement" : { "name" : null, "symbol" : null, "definition" : null }, "@iot.id" : "geo.uni-augsburg.de/Fixed-Wing-UAV-1/Datastreams/LOAC_LOCAL_201704031605.mass" },
"FeatureOfInterest@iot.navigationLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')/FeatureOfInterest",
"FeatureOfInterest" : { "@iot.id" : "c458a1a4-9810-11e8-8839-176a6dbe6951" }, "@iot.id" : "c45e9532-9810-11e8-8839-03e1e3365152", "@iot.selfLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')" }
未格式化:
Key c45e9532-9810-11e8-8839-03e1e3365152
Value { "phenomenonTime" : "2017-04-03T16:08:09.000Z", "resultTime" : "2017-04-03T16:08:09.000Z", "result" : { "Temperature" : 0, "Pressure" : 0, "Humidity" : 0, "Mean altitude" : 0, "Mass PM2.5" : 7.1, "Mass Error PM2.5" : 1.5, "Mass PM10" : 9.6, "Mass Error PM10" : 1.5 }, "Datastream@iot.navigationLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')/Datastream", "Datastream" : { "unitOfMeasurement" : { "name" : null, "symbol" : null, "definition" : null }, "@iot.id" : "geo.uni-augsburg.de/Fixed-Wing-UAV-1/Datastreams/LOAC_LOCAL_201704031605.mass" }, "FeatureOfInterest@iot.navigationLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')/FeatureOfInterest", "FeatureOfInterest" : { "@iot.id" : "c458a1a4-9810-11e8-8839-176a6dbe6951" }, "@iot.id" : "c45e9532-9810-11e8-8839-03e1e3365152", "@iot.selfLink" : "http://localhost:8080/FROST-Server/v1.0/Observations('c45e9532-9810-11e8-8839-03e1e3365152')" }
但是Datastream@iot.navigationLink、Datastream等并不重要。关键是密钥必须相同。
它看起来就像这样 (https://i.imgur.com/zvwf3g7.png)
完整导出的数据流:
示例 Kafka 客户端:
streamProperties1.put("default.deserialization.exception.handler", "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler");
以跳过反序列化失败的消息(至少用于测试目的)。 - Vasyl Sarzhynskyi