Spring Cloud Data Flow过滤器组件中的JsonPath无法正常工作

15

我正在尝试编写一个简单的SCDF流程,从Kafka读取消息,根据特定值的存在过滤消息,并将数据推送到Mongo。作为其中的一部分,我不得不编写以下#jsonPath。

 #jsonPath(payload,'$[?(@.metadata!=null)].metadata[?(@.trigger-routing!=
 null)].trigger-routing') == {'1'}

我编写了一个示例测试,将运行SPeL并验证其返回值(注:我有意使用@EnableIntegration将SPeL功能与SCDF的相同配置连接起来,至少这是我的理论)

@SpringBootTest(classes = SpelTst.TestConfiguration.class)
public class SpelTst {

    @Configuration
    @EnableIntegration
    public static class TestConfiguration {

    }

    @Autowired
    IntegrationEvaluationContextFactoryBean factory;

    @Test
    public void test() throws JsonProcessingException {
        final StandardEvaluationContext context = factory.getObject();
        ExpressionParser parser = new SpelExpressionParser();
        Expression exp = parser.parseExpression("#jsonPath(payload,'$[?(@.metadata!=null)].metadata[?(@.trigger-routing!= null)].trigger-routing') == {'1'}");

        final PixelHitMessage sampleOne = new PixelHitMessage()
                .setMetadata(ImmutableMap.of("trigger-routing", "1"))
                .toChildType();

        final PixelHitMessage sampleTwo = new PixelHitMessage()
                .setMetadata(ImmutableMap.of("trigger-routing", ""))
                .toChildType();

        final PixelHitMessage sampleThree = new PixelHitMessage()
                .setMetadata(Collections.emptyMap())
                .toChildType();

        final PixelHitMessage sampleFour = new PixelHitMessage()
                .toChildType();


        System.out.println(resolve(context, exp, sampleOne));
        System.out.println(resolve(context, exp, sampleTwo));
        System.out.println(resolve(context, exp, sampleThree));
        System.out.println(resolve(context, exp, sampleFour));
    }

    private static Object resolve(StandardEvaluationContext context, Expression exp, PixelHitMessage sampleOne) throws JsonProcessingException {
        final ObjectMapper mapper = new ObjectMapper();
        final String payload = mapper.writerFor(PixelHitMessage.class).writeValueAsString(sampleOne);
        System.out.println(payload);

        final Message<String> kafkaMessage = MessageBuilder.withPayload(payload).build();

        context.setRootObject(kafkaMessage);

        return exp.getValue(context, Object.class);
    }

}

当我运行这个程序时,会得到以下输出:

{"timestamp":"2020-06-26T19:31:38.013Z","level":"INFO","thread":"main","logger":"SpelTst","message":"Started SpelTst in 1.706 seconds (JVM running for 4.352)","context":"default"}

{"eventId":null,"postTime":null,"headers":null,"metadata":{"trigger-routing":"1"}}
true
{"eventId":null,"postTime":null,"headers":null,"metadata":{"trigger-routing":""}}
false
{"eventId":null,"postTime":null,"headers":null,"metadata":{}}
false
{"eventId":null,"postTime":null,"headers":null,"metadata":null}
false

以上就是我想要实现的确切行为。

但是当我在SCDF的过滤器组件中使用相同的SPeL时,我遇到了以下异常

Caused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['metadata']['trigger-routing']

应返回 false 的消息示例

{"eventId":"acb0afce-7782-4dc6-af09-4d6878fa8fd3","postTime":1593201189799,"headers":{"accept":"*/*","host":"localhost:7071","user-agent":"insomnia/2020.2.2"},"metadata":{}}

应该返回真的消息示例

{"eventId":"045698d4-d4dc-41b0-8bab-7c07ad58970a","postTime":1593201492866,"headers":{"accept":"*/*","host":"localhost:7071","user-agent":"insomnia/2020.2.2"},"metadata":{"trigger-routing":"1"}}

在SCDF中,SPeL仅适用于正面情况,路径上任何数据的缺失都会导致以上异常。我考虑使用Option.DEFAULT_PATH_LEAF_TO_NULL来处理JsonPath,但据我所知,没有办法通过Spring属性指定它(我检查了JsonPathUtils的代码,他们调用了使用默认上下文而不是具有默认(空)配置的JsonPath逻辑的版本)。

我还验证了筛选表达式是否正确部署(运行筛选应用程序的Pod的K8配置屏幕),看起来是正确的。

K8 config screenshot


正在使用哪个版本的 json-path?此外,异常是否有更多的堆栈跟踪信息? - Kavithakaran Kanapathippillai
你可以尝试使用其他工具(例如 jq https://stedolan.github.io/jq/)来过滤你的 JSON 文件,然后进行比较吗?另外,请确保你正在使用相同的数据集测试两个解决方案。 - davidmpaz
1个回答

0

这种解决方案可以使用。

    <dependency>
        <groupId>com.jayway.jsonpath</groupId>
        <artifactId>json-path</artifactId>
        <version>2.7.0</version>
    </dependency>

那么验证可以是这样的

    DocumentContext documentContext = JsonPath.parse(messagePayload);
    Map<String, Object> metadata = documentContext.read("$.metadata");

对于 DocumentContextJsonPath,它们是导入的。

import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;

现在进入真正的场景

System.out.println(metadata);

输出将为:{trigger-routing=1}

对于错误情况

System.out.println(metadata);

输出将是:{}

其他 JSON Path 语法包括 - https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接