logstash Custom Log Filter for Apache Logs

2019-09-15 03:44发布

i am new to the ELK stack. I have a filebeat service sending logs to logstash, and in logstash using a grok filter, the data is pushed to an elasticsearch index.

I am using the gork filter with match => { "message" => "%{COMBINEDAPACHELOG}"} to parse the data.

My Issue is, I want the names of the fields and their values to be stored in the elasticsearch index. My different versions of the logs are as below:


27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atm&explain=true&bridge=true HTTP/1.1" 200 3284
27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atms&explain=true&bridge=true HTTP/1.1" 200 1452
27.60.18.21 - - [27/Aug/2017:10:28:52 +0530] "GET /api/v1.2/places/nearby/json?&refLocation=28.5359586,77.3677936&keyword=FINATM HTTP/1.1" 200 3283
27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=co&explain=true&bridge=true HTTP/1.1" 200 3415
27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=cof&explain=true&bridge HTTP/1.1" 200 2476

The fields that I want in the elastic index are below:

  1. client_ip => type must be compatible to what kibana uses for IP mapping.
  2. timestamp => datetime format. => the time the of the log
  3. method => text => the method that was called e.g. GET,POST
  4. version => decimal number => e.g. 1.2 / 1.0 (in the sample logs as v1.2)
  5. username => text => it's the text after the username= (in the sample log as pradeep.pgu)
  6. location =>geo_point type => the value has both latitude and longitude so that kibana can plot these on the map.
  7. search_query => text => the thing that was searched (in the sample from either of the two fields "keyword=" or "query="). Either of the two fields would be present and the one that is present, it's value must be used.
  8. response_code => number => the code of the response. (in the sample as 200)
  9. data_transfered => number => the amount of data transferred (the last number in the sample).

Is such a thing even possible? Does the gork filter has a provision for this? The thing is the parameters are not order specific.

1条回答
萌系小妹纸
2楼-- · 2019-09-15 04:10

Starting from the HTTPD_COMMONLOG, you could use this pattern (which you can test at grok tester):

grok {
 match => { 
  "message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/places/search/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"
 } 
}

Once the grok filter have extracted the request, you can use the kv filter on it, which will extract the parameters (and ignore the problem of the parameters not being order specific). You'll have to put the field_split option to &:

kv { 
  source => "request"
  field_split => "&"
}

For search_query, depending on which field is present, we use the mutate filter with the add_field option to create the field.


filter {
    grok {
        match => { 
            "message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/.*/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"
        } 
    }
    kv { 
        source => "request"
        field_split => "&"
    }

    if [query] {
        mutate {
            add_field => { "search_query" => "%{query}" }
        }
    } else if [keyword] {
        mutate {
            add_field => { "search_query" => "%{keyword}" }
        }
    }

    if [refLocation] {
        mutate {
            rename => { "refLocation" => "location" }
        }
    }
}
查看更多
登录 后发表回答