Logstash grok过滤器 - 动态命名字段

12

我有以下格式的日志行,并希望提取字段:

[field1: content1] [field2: content2] [field3: content3] ...

我既不知道字段名称,也不知道字段数量。

我尝试使用反向引用和sprintf格式,但没有得到结果:

match => [ "message", "(?:\[(\w+): %{DATA:\k<-1>}\])+" ] # not working
match => [ "message", "(?:\[%{WORD:fieldname}: %{DATA:%{fieldname}}\])+" ] # not working

这似乎只对一个字段有效,而不是多个字段:

match => [ "message", "(?:\[%{WORD:field}: %{DATA:content}\] ?)+" ]
add_field => { "%{field}" => "%{content}" }

kv过滤器也不适合,因为字段内容可能包含空格。

是否有插件/策略来解决这个问题?

3个回答

11

Logstash Ruby插件可以帮助您。 :)

以下是配置:

input {
    stdin {}
}

filter {
    ruby {
        code => "
            fieldArray = event['message'].split('] [')
            for field in fieldArray
                field = field.delete '['
                field = field.delete ']'
                result = field.split(': ')
                event[result[0]] = result[1]
            end
        "
    }
}

output {
    stdout {
        codec => rubydebug
    }
}

使用您的日志:

[field1: content1] [field2: content2] [field3: content3]

这是输出结果:

{
   "message" => "[field1: content1] [field2: content2] [field3: content3]",
  "@version" => "1",
"@timestamp" => "2014-07-07T08:49:28.543Z",
      "host" => "abc",
    "field1" => "content1",
    "field2" => "content2",
    "field3" => "content3"
}

我尝试了使用4个字段,它也有效。

请注意,ruby代码中的event是logstash事件。您可以使用它来获取所有事件字段,例如message,@timestamp等。

享受它!!!


6
我找到了另一种使用正则表达式的方法:
ruby {
    code => "
        fields = event['message'].scan(/(?<=\[)\w+: .*?(?=\](?: |$))/)
        for field in fields
            field = field.split(': ')
            event[field[0]] = field[1]
        end
    "
}

0

我知道这是一个旧帖子,但今天我刚看到它,所以我想提供一种替代方法。请注意,通常情况下,我几乎总是使用ruby过滤器,如前两个答案中建议的那样。然而,我认为我可以提供这个作为另一种选择。

如果有固定数量的字段或最大数量的字段(即可能少于三个字段,但永远不会超过三个字段),这也可以通过grokmutate过滤器的组合来完成。

# Test message is: `[fieldname: value]`
# Store values in [@metadata] so we don't have to explicitly delete them.
grok {
    match => {
        "[message]" => [
            "\[%{DATA:[@metadata][_field_name_01]}:\s+%{DATA:[@metadata][_field_value_01]}\]( \[%{DATA:[@metadata][_field_name_02]}:\s+%{DATA:[@metadata][_field_value_02]}\])?( \[%{DATA:[@metadata][_field_name_03]}:\s+%{DATA:[@metadata][_field_value_03]}\])?"
        ]
    }
}

# Rename the fieldname, value combinations. I.e., if the following data is in the message:
#
#     [foo: bar]
#
# It will be saved in the elasticsearch output as:
#
#    {"foo":"bar"}
#
mutate {
    rename => {
        "[@metadata][_field_value_01]" => "[%{[@metadata][_field_name_01]}]"
        "[@metadata][_field_value_02]" => "[%{[@metadata][_field_name_02]}]"
        "[@metadata][_field_value_03]" => "[%{[@metadata][_field_name_03]}]"
    }
    tag_on_failure => []
}

对于那些可能不太熟悉正则表达式的人来说,()? 中的捕获是可选的正则匹配,意味着如果没有匹配,表达式不会失败。mutate 过滤器中的 tag_on_failure => [] 选项确保如果重命名失败,则不会将任何错误附加到 tags,因为没有数据可以捕获,结果就没有字段可供重命名。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接