我正在尝试在Apache/KAFKA KSQL中创建一个流。 该主题包含(相对复杂的JSON)。
{
"agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
"created_at": "2018-02-17 16:00:00.000Z",
"id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
"event_type": "data",
"total_charged_amount": {
"tax_free_amount": null,
"tax_amounts": [],
"tax_included_amount": {
"amount": 0.0241,
"currency": "EUR"
}
}
"used_service_units": [
{
"amount": 2412739,
"currency": null,
"unit_of_measure": "bytes"
}
]
}
现在,对于像事件类型和创建时间这样的简单内容,创建流非常容易。代码如下:
CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json');
但是现在我需要访问使用的服务单元....我想从上面的JSON中提取“amount”。
我应该怎么做?
CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');
结果在...
line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...
如果我改为像这样创建一个流
CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units varchar) WITH (kafka_topic='usage_events', value_format='json');
然后像这样在流上执行 SQL SELECT:
SELECT EXTRACTJSONFIELD(used_service_units,'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units,'$[0].amount') FROM usage;
这两个选择都不起作用...
这个选项给了我
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;'
Code generation failed for SelectValueMapper