Summary:
Use a filter in logstash that will read a value from event field, lookup this value in external file (e.g. csv) and retrieve value from external file that match. Use value from external file to add as an extra field in the event.
More info:
I have a log file with events. The events look like:
{"@timestamp":"2014-06-18T11:52:45.370636+02:00","location":{"MainId":3,"SubId":"5"},"EndRequest":{"Duration":{"Main":0,"Page":6720}}}
I have a static csv file like:
1,left
2,right
3,top
When an event is processed in logstash I want to be able to use a filter which will check value of MainId (in example event = 3) and find this value in the csv file. If found then the event must get a tag: "top".
It's kind of a similar way of the filter "GeoIP". The event has a field value, match value in a "database" and return values back which can be added to the event.
I was not able to find a current filter that could to above process. Do I need to make a custom filter myself? If so can some one give a hint how to approach this?
I've never seen a plugin written for it, so I went ahead and wrote a very basic one:
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "csv"
# The cvslookup filter allows you to add fields to an event
# base on a csv file
class LogStash::Filters::CSVLookup < LogStash::Filters::Base
config_name "csvlookup"
milestone 1
# Example:
#
# filter {
# csvlookup {
# file => 'key_value.csv'
# key_col => 1
# value_col => 2
# default => 'some_value'
# map_field => { "from_field" => "to_field" }
# }
# }
#
# the default is used if the key_col's value is not present in the CSV file
config :file, :validate => :string, :required => true
config :key_col, :validate => :number, :default => 1, :required => false
config :value_col, :validate => :number, :default => 2, :required => false
config :default, :validate => :string, :required => false
config :map_field, :validate => :hash, :required => true
public
def register
@lookup = Hash.new
CSV.foreach(@file) do |row|
@lookup[row[@key_col - 1]] = row[@value_col - 1]
end
#puts @lookup.inspect
end # def register
public
def filter(event)
return unless filter?(event)
@map_field.each do |src_field,dest_field|
looked_up_val = @lookup[event[src_field].to_s]
if looked_up_val.nil?
if !@default.nil?
event[dest_field] = @default
end
else
if event[dest_field].nil?
event[dest_field] = looked_up_val
elsif !event[dest_field].is_a?(Array)
event[dest_field] = [ event[dest_field], looked_up_val ]
else
event[dest_field].push(looked_up_val)
end
end
end
end # def filter
end # class LogStash::Filters::CSVLookup
There is further work that could be done on it -- for example if the src_field was an array, it could iterate over it, but it should work as is for your case.
There is the Translate filter.
Instead of a CSV, you have a YAML file, and for single key-value pairs, that should be an easy sed yaml conversion
Latest doc at time of writing: http://logstash.net/docs/1.4.2/filters/translate