创建KSQL流:如何从复杂的JSON中提取值

7

我正在尝试在Apache/KAFKA KSQL中创建一个流。 该主题包含(相对复杂的JSON)。

{
  "agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
  "created_at": "2018-02-17 16:00:00.000Z",
  "id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
  "event_type": "data",
  "total_charged_amount": {
    "tax_free_amount": null,
    "tax_amounts": [],
    "tax_included_amount": {
      "amount": 0.0241,
      "currency": "EUR"
    }
  }
  "used_service_units": [
    {
      "amount": 2412739,
      "currency": null,
      "unit_of_measure": "bytes"
    }
  ]
}

现在,对于像事件类型和创建时间这样的简单内容,创建流非常容易。代码如下:

CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json');

但是现在我需要访问使用的服务单元....我想从上面的JSON中提取“amount”。

我应该怎么做?

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');

结果在...

line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...

如果我改为像这样创建一个流
CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units varchar) WITH (kafka_topic='usage_events', value_format='json');

然后像这样在流上执行 SQL SELECT:
SELECT EXTRACTJSONFIELD(used_service_units,'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units,'$[0].amount') FROM usage;

这两个选择都不起作用...

这个选项给了我

SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;'

Code generation failed for SelectValueMapper
1个回答

6

似乎解决这个问题的一种方法是将列数据类型设置为数组,即

CREATE STREAM usage (event_type varchar,created_at varchar, total_charged_amount varchar, used_service_units array<varchar> ) WITH (kafka_topic='usage_events', value_format='json');

现在我能做到以下事情:
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage

@RobinMoffatt,这种方法有文档记录吗?在https://www.confluent.io/product/ksql/的示例/食谱中找不到它,但这也是我的确切问题,所以可能这是一个更常见的要求。 - Jochem Schulenklopper
1
@JochemSchulenklopper 很高兴写下来,但首先需要更详细地了解具体问题 - 你能联系我吗? robin@confluent.io。谢谢。 - Robin Moffatt
1
@RobinMoffatt,如果JSON文档包含更多层、更多字典、数组中的更多项或字典中的数组,那么该部分需要发生什么变化?在我的情况下,我正在解析公共SaaS提供商通过Webhook发送的事件数据,JSON文档相当平坦。我猜测(也许是希望)有一些查询语言可以表达“多级路径”,就像XPath的JSON变体,或者针对KSQL中的JSON文档的类似CSS的选择器....但我在KSQL的文档中找不到这样的东西。(指向文档的指针同样有价值。) - Jochem Schulenklopper
1
@JochemSchulenklopper 在 Confluent 上看到了一篇关于轻度嵌套 JSON 的文章 https://www.confluent.io/stream-processing-cookbook/ksql-recipes/nested-json-data。对于我的使用来说过于简单,但它可能会对你有所帮助。 - Ajay M

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接