使用Apache Flink SQL从Kafka消息中获取嵌套字段

8

我正在尝试使用Apache Flink 1.11创建一个源表,以便可以访问JSON消息中的嵌套属性。我可以获取根属性的值,但不确定如何访问嵌套对象。

文档建议应该使用MAP类型,但是当我设置它时,出现以下错误:

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

这是我的SQL查询语句

        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

我的 JSON 数据看起来像这样:

{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}
4个回答

8
您可以使用ROW来提取JSON消息中的嵌套字段。您的DDL语句应该类似于:
CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

1
太好了,这个可行!那么对于深度嵌套的JSON,似乎这可能会变得难以处理?在使用PyFlink和SQL处理JSON数据方面,有更好的方法吗? - bash721
1
目前来说,我认为没有更好的方法。不过,Flink SQL 很快就会支持 SQL JSON 函数,这应该会让事情变得更加容易!具体信息请参考:https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550 - morsapaes

7

[2022 更新]

在 Apache Flink 1.13 版本中,没有内置的 JSON 函数。 它们在 1.14 版本中引入。请查看this

如果您使用的是版本小于 1.14,则请参考以下解决方案。

如何创建带有嵌套 JSON 输入的表格?

JSON 输入示例:

{
    "id": "message-1",
    "title": "Some Title",
    "properties": {
        "foo": "bar",
        "nested_foo":{
            "prop1" : "value1",
            "prop2" : "value2"
        }
    }
}

创建语句
CREATE TABLE input(
                id VARCHAR,
                title VARCHAR,
                properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

如何选择嵌套列?
SELECT properties.foo, properties.nested_foo.prop1 FROM input;

请注意,如果您输出结果时,请注意
SELECT properties FROM input

你以行格式看到结果。列properties的内容将会是什么。
+I[bar, +I[prop1,prop2]]

1

您也可以尝试

CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP<STRING, STRING>
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

唯一的区别是:MAP<STRING, STRING>MAP

1
如果使用format=raw,您可以使用JSON_VALUE函数从payload中提取感兴趣的字段。以下是代码:
 CREATE TABLE input(
        payload STRING,
        foo AS JSON_VALUE(payload, '$.properties.foo' RETURNING STRING),
) WITH (
  'connector' = 'kafka',        
  'format' = 'raw'
)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接