如何使用Logstash过滤器处理多行日志条目?

28

背景:

我有一个自定义生成的日志文件,其具有以下模式:

[2014-03-02 17:34:20] - 127.0.0.1|ERROR| E:\xampp\htdocs\test.php|123|subject|The error message goes here ; array (
  'create' => 
  array (
    'key1' => 'value1',
    'key2' => 'value2',
    'key3' => 'value3'
  ),
)
[2014-03-02 17:34:20] - 127.0.0.1|DEBUG| flush_multi_line

第二个条目 [2014-03-02 17:34:20] - 127.0.0.1|DEBUG| flush_multi_line 是一个虚拟行,只是为了让logstash知道多行事件已经结束,此行稍后会被删除。

我的配置文件如下:

input {
  stdin{}
}

filter{
  multiline{
      pattern => "^\["
      what => "previous"
      negate=> true
  }
  grok{
    match => ['message',"\[.+\] - %{IP:ip}\|%{LOGLEVEL:loglevel}"]
  }

  if [loglevel] == "DEBUG"{ # the event flush  line
    drop{}
  }else if [loglevel] == "ERROR"  { # the first line of multievent
    grok{
      match => ['message',".+\|.+\| %{PATH:file}\|%{NUMBER:line}\|%{WORD:tag}\|%{GREEDYDATA:content}"] 
    }
  }else{ # its a new line (from the multi line event)
    mutate{
      replace => ["content", "%{content} %{message}"] # Supposing each new line will override the message field
    }
  }  
}

output {
  stdout{ debug=>true }
}

“content”字段的输出结果为:The error message goes here; array(

问题:

我的问题是我想把其余的多行存储到内容字段中:

The error message goes here ; array (
  'create' => 
  array (
    'key1' => 'value1',
    'key2' => 'value2',
    'key3' => 'value3'
  ),
)

所以我可以稍后删除消息字段。

@message 字段包含整个多行事件,因此我尝试使用 mutate 过滤器 并对其使用 replace 函数,但我无法使其工作 :(。

我不理解 Multiline 过滤器的工作方式,如果有人能为此提供一些帮助,将不胜感激。

谢谢,

Abdou。

4个回答

13
我查看了源代码并发现:
  • 多行过滤器会取消所有被视为等待事件的后续事件,然后将该行附加到原始消息字段中,这意味着任何在多行过滤器之后的过滤器在此情况下都不适用。
  • 唯一可以通过过滤器的事件是被视为新事件的事件(在我的情况下以 [开头)。

以下是工作代码:

input {
   stdin{}
}  

filter{
      if "|ERROR|" in [message]{ #if this is the 1st message in many lines message
      grok{
        match => ['message',"\[.+\] - %{IP:ip}\|%{LOGLEVEL:loglevel}\| %{PATH:file}\|%{NUMBER:line}\|%{WORD:tag}\|%{GREEDYDATA:content}"]
      }

      mutate {
        replace => [ "message", "%{content}" ] #replace the message field with the content field ( so it auto append later in it )
        remove_field => ["content"] # we no longer need this field
      }
    }

    multiline{ #Nothing will pass this filter unless it is a new event ( new [2014-03-02 1.... )
        pattern => "^\["
        what => "previous"
        negate=> true
    }

    if "|DEBUG| flush_multi_line" in [message]{
      drop{} # We don't need the dummy line so drop it
    }
}

output {
  stdout{ debug=>true }
}

您好,

阿卜杜


1
我对这个答案进行了负评,但给你的问题点了赞。这个信息可能曾经是正确的,但现在不再正确了(至少对于1.4.2版本来说是如此)。考虑接受@sbange的答案,那是唯一一个对我有效的答案。 - CrazyPyro

12

在这个问题 https://logstash.jira.com/browse/LOGSTASH-509 中提到了 grok 和多行处理。

只需在您的 grok 正则表达式前面添加 "(?m)",就不需要进行变异。以下是来自该问题的示例:

pattern => "(?m)<%{POSINT:syslog_pri}>(?:%{SPACE})%{GREEDYDATA:message_remainder}"

1
是的!当其他方法都不行时,这个对我有用。我认为你的 pattern => 应该改成 grok{ match =>。为了完整起见,考虑编辑并包括 @Thales Ceolin 的评论以及原始问题中的实际 multiline 块。这样人们就可以在这个答案中获得一个即插即用的解决方案。 - CrazyPyro

6
多行过滤器将在消息中添加“\n”。例如:
"[2014-03-02 17:34:20] - 127.0.0.1|ERROR| E:\\xampp\\htdocs\\test.php|123|subject|The error message goes here ; array (\n  'create' => \n  array (\n    'key1' => 'value1',\n    'key2' => 'value2',\n    'key3' => 'value3'\n  ),\n)"

然而,grok过滤器无法解析“\n”。因此,您需要将\n替换为另一个字符,比如空格。
mutate {
    gsub => ['message', "\n", " "]
}

然后,grok模式可以解析消息。例如:
 "content" => "The error message goes here ; array (   'create' =>    array (     'key1' => 'value1',     'key2' => 'value2',     'key3' => 'value3'   ), )"

谢谢你的回答Ben,但是你的代码由于我在我的回答中提到的原因而无法工作。 - emonik
实际上,我已经使用了您的配置和日志,它对我有效!您需要在多行之后添加gsub过滤器。 - Ban-Chuan Lim
确实,你的代码可以工作,感谢你提供关于 grok 技巧的信息。但是我更愿意使用我的答案中的代码,因为在将消息附加到之前,我需要更多的控制和编辑。 所以这就是我要标记为答案的那个,很遗憾我没有足够的声望来赞同你的答案 :( 非常感谢你的帮助,谢谢。 - emonik
1
欢迎你。:). 如果你有任何问题,请在这里问。我已经为你点赞了。你的回答太棒了! - Ban-Chuan Lim

1
问题不就是过滤器的排序吗?在logstash中,顺序非常重要。您不需要另一行来指示已完成输出多行日志行。只需确保多行过滤器出现在grok之前即可(见下文)。
附注:我已成功解析了一个多行日志行,其中xml附加到日志行的末尾,并跨越多行,但我仍然将一个漂亮干净的xml对象放入了我的内容等效变量中(以下命名为xmlrequest)。在您提到在日志中记录xml之前...我知道...这不是理想的...但那是另一个辩论的问题 :)):
filter { 
multiline{
        pattern => "^\["
        what => "previous"
        negate=> true
    }

mutate {
    gsub => ['message', "\n", " "]
}

mutate {
    gsub => ['message', "\r", " "]
}

grok{
        match => ['message',"\[%{WORD:ONE}\] \[%{WORD:TWO}\] \[%{WORD:THREE}\] %{GREEDYDATA:xmlrequest}"]
    }

xml {
source => xmlrequest
remove_field => xmlrequest
target => "request"
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接