Using field as input to Logstash Grok filter patte

2019-08-03 04:12发布

问题:

I'm wondering if it is possible to use a field in the Logstash message as the input the to Grok pattern. Say I have an entry that looks like:

{
    "message":"10.1.1.1",
    "grok_filter":"%{IP:client}"
}

I want to be able to do something like this:

filter {
  grok {
    match => ["message", ["%{grok_filter}"]]
  }
}

The problem is this crashes Logstash as it appears to treat "%{grok_filter}" as the Grok filter itself instead of the value of grok_filter. I get the following after Logstash has crashed:

The error reported is: 
  pattern %{grok_filter} not defined

Is there anyway to get the value of a field from inside the Grok filter block and use that as the input to the Grok pattern?

回答1:

The answer is no -- the grok filter compiles its pattern when the filter is initialized. If you need to do something like that you'll have to write your own filter that compiles the pattern every time (and pay the performance penalty).

Without knowing more about why you want to do this, it's hard to recommend the best course of action. If you have a limited number of patterns, you can just set a grok_filter_type parameter and then have a bunch of if [grok_filter_type] == 'ip' { grok { ... } } type of things.

Here's a custom filter that will allow you to do what you want -- it's mostly a copy of the grok code, but there are some changes/simplifications. I've tested it and it seems to work for me.

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/environment"
require "set"

# A version of grok that can parse from a log-defined pattern.  Not really
# recommended for high usage patterns, but for the occassional pattern it
# should work
#     filter {
#       grok_dynamic {
#         match_field => "message"
#     pattern_field => "message_pattern"
#       }
#     }
#
class LogStash::Filters::GrokDynamic < LogStash::Filters::Base
  config_name "grok_dynamic"
  milestone 1

  # The field that contains the data to match against
  config :match_field, :validate => :string, :required => true
  # the field that contains the pattern
  config :pattern_field, :validate => :string, :required => true
  # where the patterns are
  config :patterns_dir, :validate => :array, :default => []

  # If true, only store named captures from grok.
  config :named_captures_only, :validate => :boolean, :default => true

  # If true, keep empty captures as event fields.
  config :keep_empty_captures, :validate => :boolean, :default => false

  # Append values to the 'tags' field when there has been no
  # successful match
  config :tag_on_failure, :validate => :array, :default => ["_grokparsefailure"]

  # The fields to overwrite.
  #
  # This allows you to overwrite a value in a field that already exists.
  config :overwrite, :validate => :array, :default => []

  # Detect if we are running from a jarfile, pick the right path.
  @@patterns_path ||= Set.new
  @@patterns_path += [LogStash::Environment.pattern_path("*")]

  public
  def initialize(params)
    super(params)
    @handlers = {}
  end

  public
  def register
    require "grok-pure" # rubygem 'jls-grok'

    @patternfiles = []

    # Have @@patterns_path show first. Last-in pattern definitions win; this
    # will let folks redefine built-in patterns at runtime.
    @patterns_dir = @@patterns_path.to_a + @patterns_dir
    @logger.info? and @logger.info("Grok patterns path", :patterns_dir => @patterns_dir)
    @patterns_dir.each do |path|
      if File.directory?(path)
        path = File.join(path, "*")
      end

      Dir.glob(path).each do |file|
        @logger.info? and @logger.info("Grok loading patterns from file", :path => file)
        @patternfiles << file
      end
    end

    @patterns = Hash.new { |h,k| h[k] = [] }

    @grok = Grok.new
    @patternfiles.each { |path| @grok.add_patterns_from_file(path) }

  end # def register

  public
  def filter(event)
    return unless filter?(event)
    return if event[@match_field].nil? || event[@pattern_field].nil?

    @logger.debug? and @logger.debug("Running grok_dynamic filter", :event => event);
    @grok.compile(event[@pattern_field]);
    if match(@grok,@match_field, event)
      filter_matched(event)
    else
      # Tag this event if we can't parse it. We can use this later to
      # reparse+reindex logs if we improve the patterns given.
      @tag_on_failure.each do |tag|
        event["tags"] ||= []
        event["tags"] << tag unless event["tags"].include?(tag)
      end
    end

    @logger.debug? and @logger.debug("Event now: ", :event => event)
  end # def filter

  private
  def match(grok, field, event)
    input = event[field]
    if input.is_a?(Array)
      success = true
      input.each do |input|
        match = grok.match(input)
        if match
          match.each_capture do |capture, value|
            handle(capture, value, event)
          end
        else
          success = false
        end
      end
      return success
    #elsif input.is_a?(String)
    else
      # Convert anything else to string (number, hash, etc)
      match = grok.match(input.to_s)
      return false if !match

      match.each_capture do |capture, value|
        handle(capture, value, event)
      end
      return true
    end
  rescue StandardError => e
    @logger.warn("Grok regexp threw exception", :exception => e.message)
  end

  private
  def handle(capture, value, event)
    handler = @handlers[capture] ||= compile_capture_handler(capture)
    return handler.call(value, event)
  end

  private
  def compile_capture_handler(capture)
    # SYNTAX:SEMANTIC:TYPE
    syntax, semantic, coerce = capture.split(":")

    # each_capture do |fullname, value|
    #   capture_handlers[fullname].call(value, event)
    # end

    code = []
    code << "# for capture #{capture}"
    code << "lambda do |value, event|"
    #code << "  p :value => value, :event => event"
    if semantic.nil?
      if @named_captures_only
        # Abort early if we are only keeping named (semantic) captures
        # and this capture has no semantic name.
        code << "  return"
      else
        field = syntax
      end
    else
      field = semantic
    end
    code << "  return if value.nil? || value.empty?" unless @keep_empty_captures
    if coerce
      case coerce
        when "int"; code << "  value = value.to_i"
        when "float"; code << "  value = value.to_f"
      end
    end

    code << "  # field: #{field}"
    if @overwrite.include?(field)
      code << "  event[field] = value"
    else
      code << "  v = event[field]"
      code << "  if v.nil?"
      code << "    event[field] = value"
      code << "  elsif v.is_a?(Array)"
      code << "    event[field] << value"
      code << "  elsif v.is_a?(String)"
      # Promote to array since we aren't overwriting.
      code << "    event[field] = [v, value]"
      code << "  end"
    end
    code << "  return"
    code << "end"

    #puts code
    return eval(code.join("\n"), binding, "<grok capture #{capture}>")
  end # def compile_capture_handler

end # class LogStash::Filters::Grok