使用Logstash解析包含Python跟踪的日志

8

我一直在尝试使用Logstash解析我的Python回溯日志。我的日志如下:

[pid: 26422|app: 0|req: 73/73] 192.168.1.1 () {34 vars in 592 bytes} [Wed Feb 18 13:35:55 2015] GET /data => generated 2538923 bytes in 4078 msecs (HTTP/1.1 200) 2 headers in 85 bytes (1 switches on core 0)
Traceback (most recent call last):
  File "/var/www/analytics/parser.py", line 257, in parselogfile
    parselogline(basedir, lne)
  File "/var/www/analytics/parser.py", line 157, in parselogline
    pval = understandpost(parts[3])
  File "/var/www/analytics/parser.py", line 98, in understandpost
    val = json.loads(dct["events"])
  File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python2.7/json/decoder.py", line 382, in raw_decode
    obj, end = self.scan_once(s, idx)
ValueError: Unterminated string starting at: line 1 column 355 (char 354)

到目前为止,我已经能够解析日志,除了最后一行,即

ValueError: Unterminated string starting at: line 1 column 355 (char 354)

我使用多行过滤器来实现此功能。我的logstash配置大致如下:
filter {

    multiline {
        pattern => "^Traceback"
        what => "previous"
    }

    multiline {
        pattern => "^ "
        what => "previous"
    }


    grok {
        match => [
            "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
            ]
    }

    if "_grokparsefailure" in [tags] {
        multiline {
            pattern => "^.*$"
            what => "previous"
            negate => "true"
        }
    }

    if "_grokparsefailure" in [tags] {
        grok {
            match => [
                  "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
        ]
            remove_tag => ["_grokparsefailure"]
        }
    }
}

但是我的最后一行没有解析。相反,它仍然给我一个错误,并且也使处理时间呈指数增长。有没有关于如何解析traceback的最后一行的建议?


我以前从未见过有三个多行配置的配置文件。通常,您会找到标识一个部分开头的模式(对于您来说可能是“[pid”或“Traceback”),然后将所有内容组合到下一个出现的消息中。 - Alain Collins
做了一些功课,使用一个多行过滤器解决了它,我只需要识别以[开头的第一行作为我的日志标识,其他行则使用多行过滤器进行追加。如果有人需要解析Python日志,我会在这里发布解决方案。 - Keshav Agarwal
同时,三个多行过滤器使用了太多的处理能力,导致logstash变慢,而只使用一个这样的过滤器,它就像魔法一样工作! - Keshav Agarwal
1个回答

8
好的,我找到了一个解决方案。我采用的方法是忽略以“ [”开头的日志消息,并将所有其他行追加到上一条消息的末尾。然后可以应用 grok 过滤器并解析 traceback。请注意,我必须应用两个 grok 过滤器:
  1. 当有带有 GREEDYDATA 的 traceback 时,获取 traceback。
  2. 当没有 traceback 时,GREEDYDATA 解析将失败,我必须删除 _grokparsefailure 标记,然后再次应用不带有 GREEDYDATA 的 grok。这是使用 if 语句完成的。
最终的 logstash 过滤器看起来像这样:
filter {

    multiline {
        pattern => "^[^\[]"
        what => "previous"
    }



    grok {
        match => [
            "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
        ]
    }

    if "_grokparsefailure" in [tags] {
        grok {
            match => [
            "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)"
                ]
            remove_tag => ["_grokparsefailure"]
        }
    }

    else {
        mutate {
            convert => {"traceback" => "string"}
        }
    }

    date {
        match => ["timestamp", "dd/MM/YYYY:HH:MM:ss Z"]
        locale => en
    }
    geoip {
        source => "clientip"
    }
    useragent {
        source => "agent"
        target => "Useragent"
    }
}

或者,如果您不想使用if块来检查另一个grok模式并删除_grokparsefailure,则可以使用第一个grok过滤器来检查两种消息类型,方法是在grok过滤器的match数组中包含多个消息模式检查。可以像这样完成:

        grok {
            match => [
            "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)",
            "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
                ]
        }

还有第三种方法(可能是最优雅的方法)。它看起来像这样:

grok {
    match => [
        "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)(%{GREEDYDATA:traceback})?"
    ]
}

请注意,在这种方法中,存在可选字段必须用"()?"括起来。在这里,(%{GREEDYDATA:traceback})?
因此,Grok过滤器会看到如果该字段可用,则会解析该字段。否则,它将被跳过。

多行看起来更好!请注意,您可以使模式中的GREEDYDATA部分变为可选。(foo)?是一般语法,我相信。 - Alain Collins
让我来搜索一下,总有一个更简单、更小的方法是可行的!;) - Keshav Agarwal
找到了一个。它不使用foo,相反我们可以在第一个grok过滤器中定义多个消息匹配模式,从而消除if块的需要。我会编辑我的答案来引用这种方法。 - Keshav Agarwal
将其作为模式的可选项会比在grok中列出两个略有不同的模式更容易维护。重要的不是“foo”,而是“( )?”。 - Alain Collins
我尝试了一下,是的,它起作用了。确实是一种优雅的方法!再次编辑答案以引用这种方法。 - Keshav Agarwal
这个模式对我也起作用了。但是由于我将使用Filebeat与Logstash,文档建议在Filebeat上管理多行消息 - bwdm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接