Logstash: Keeping a value across events

2019-02-23 07:03发布

I have a date that is only present once in every log file and I am trying to add this date to all following events after it has been matched once, making it act like a global variable in some ways. (The date is at the top of the document and I am unable to use multiline or make changes to the file name or content)

For this, my approach is to use a grep filter with drop => false.

grok {
    patterns_dir => "[...]"
    match => [ "message", "%{DATELINE}" ]
    tag_on_failure => [ ]
}
grep {
    add_field => { "grepdate" => "%{mydate}" }
    drop => false
}
date {
    locale => "en"
    timezone => "Europe/Paris"
    match => [ "grepdate", "yyyyMMdd" ]
    target => "grepdate"
}

Regular expression:

DATELINE (= Date: (?<mydate>[0-9]{8}))

What I notice is that the grepdate field is correctly being added to all events - which is what I want - but the value of that field is not the date itself (the value of %{mydate}), but the actual string "%{mydate}", except when actually being matched for the first time (when parsing the actual date in my log file, the grepdate field contains the correct value)

What can I do to fix this?

Any help is greatly appreciated.

Edit:

I am now trying a solution that includes the use of the memorizeplugin. However, I am getting the following error:

Cannot use more than 1 filter worker because the following plugins don't work with more than one worker: memorize

Is there a way to make this filter thread-safe?

1条回答
趁早两清
2楼-- · 2019-02-23 07:36

Maybe you should use the official aggregate filter for this, since memorize is not official and will not work with Logstash >2.0.

It would go like this:

# same as what you have now
grok {
    patterns_dir => "[...]"
    match => [ "message", "%{DATELINE}" ]
    tag_on_failure => [ "not_date_line" ]

}
# add a fictional taskId field to correlate all lines
mutate {
   add_field => { "taskId" => "all" }
}

# if we're processing the first line, remember the date
if "not_date_line" not in [tags] {
    aggregate {
        task_id => "%{taskId}"
        code => "map['mydate'] = event['mydate']"
    }
} 
# if we're processing the next lines, add the date
else {
    aggregate {
        task_id => "%{taskId}"
        code => "event['mydate'] = map['mydate']"
        map_action => "update"
        timeout => 0
    }
}

All your events will then have a mydate field with the date that was on the first log line.

查看更多
登录 后发表回答