我正在尝试编写一个简单的SCDF流程,从Kafka读取消息,根据特定值的存在过滤消息,并将数据推送到Mongo。作为其中的一部分,我不得不编写以下#jsonPath。
#jsonPath(payload,'$[?(@.metadata!=null)].metadata[?(@.trigger-routing!=
null)].trigger-routing') == {'1'}
我编写了一个示例测试,将运行SPeL并验证其返回值(注:我有意使用@EnableIntegration将SPeL功能与SCDF的相同配置连接起来,至少这是我的理论)
@SpringBootTest(classes = SpelTst.TestConfiguration.class)
public class SpelTst {
@Configuration
@EnableIntegration
public static class TestConfiguration {
}
@Autowired
IntegrationEvaluationContextFactoryBean factory;
@Test
public void test() throws JsonProcessingException {
final StandardEvaluationContext context = factory.getObject();
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression("#jsonPath(payload,'$[?(@.metadata!=null)].metadata[?(@.trigger-routing!= null)].trigger-routing') == {'1'}");
final PixelHitMessage sampleOne = new PixelHitMessage()
.setMetadata(ImmutableMap.of("trigger-routing", "1"))
.toChildType();
final PixelHitMessage sampleTwo = new PixelHitMessage()
.setMetadata(ImmutableMap.of("trigger-routing", ""))
.toChildType();
final PixelHitMessage sampleThree = new PixelHitMessage()
.setMetadata(Collections.emptyMap())
.toChildType();
final PixelHitMessage sampleFour = new PixelHitMessage()
.toChildType();
System.out.println(resolve(context, exp, sampleOne));
System.out.println(resolve(context, exp, sampleTwo));
System.out.println(resolve(context, exp, sampleThree));
System.out.println(resolve(context, exp, sampleFour));
}
private static Object resolve(StandardEvaluationContext context, Expression exp, PixelHitMessage sampleOne) throws JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();
final String payload = mapper.writerFor(PixelHitMessage.class).writeValueAsString(sampleOne);
System.out.println(payload);
final Message<String> kafkaMessage = MessageBuilder.withPayload(payload).build();
context.setRootObject(kafkaMessage);
return exp.getValue(context, Object.class);
}
}
当我运行这个程序时,会得到以下输出:
{"timestamp":"2020-06-26T19:31:38.013Z","level":"INFO","thread":"main","logger":"SpelTst","message":"Started SpelTst in 1.706 seconds (JVM running for 4.352)","context":"default"}
{"eventId":null,"postTime":null,"headers":null,"metadata":{"trigger-routing":"1"}}
true
{"eventId":null,"postTime":null,"headers":null,"metadata":{"trigger-routing":""}}
false
{"eventId":null,"postTime":null,"headers":null,"metadata":{}}
false
{"eventId":null,"postTime":null,"headers":null,"metadata":null}
false
以上就是我想要实现的确切行为。
但是当我在SCDF的过滤器组件中使用相同的SPeL时,我遇到了以下异常
Caused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['metadata']['trigger-routing']
应返回 false 的消息示例
{"eventId":"acb0afce-7782-4dc6-af09-4d6878fa8fd3","postTime":1593201189799,"headers":{"accept":"*/*","host":"localhost:7071","user-agent":"insomnia/2020.2.2"},"metadata":{}}
应该返回真的消息示例
{"eventId":"045698d4-d4dc-41b0-8bab-7c07ad58970a","postTime":1593201492866,"headers":{"accept":"*/*","host":"localhost:7071","user-agent":"insomnia/2020.2.2"},"metadata":{"trigger-routing":"1"}}
在SCDF中,SPeL仅适用于正面情况,路径上任何数据的缺失都会导致以上异常。我考虑使用Option.DEFAULT_PATH_LEAF_TO_NULL来处理JsonPath,但据我所知,没有办法通过Spring属性指定它(我检查了JsonPathUtils的代码,他们调用了使用默认上下文而不是具有默认(空)配置的JsonPath逻辑的版本)。
我还验证了筛选表达式是否正确部署(运行筛选应用程序的Pod的K8配置屏幕),看起来是正确的。
json-path
?此外,异常是否有更多的堆栈跟踪信息? - Kavithakaran Kanapathippillai