extracting data from multiple events from Elastics

2019-09-16 01:07发布

I have log lines loaded in ElasticSearch which has the data scattered in multiple events, say event_id is in event (line) number 5 and event_action is available in event number 88, further event_port information is available in event number 455. How can i extract this data so that my output looks like following. For this case multiline codec will not work.

{
event_id: 1223
event_action: "socket_open"
event_port: 76654
}

Currently I have the log files persisted so i can get the file path from ES. I tried to have a shell script executed from ruby filter, this shell script can perform grep commands and put the stdout data in a new event, like following.

input {
  elasticsearch {
    hosts => "localhost:9200"
    index => "my-logs"
  }
}

filter
{

     ruby {
    code => 'require "open3"
             file_path = event.get("file_path")
             cmd =  "my_filter.sh -f #{file_path}"
             stdin, stdout, stderr = Open3.popen3(cmd)
             event.set("process_result", stdout.read)
             err = stderr.read
             if err.to_s.empty?
               filter_matched(event)
             else
               event.set("ext_script_err_msg", err)
             end'
      remove_field => ["file_path"]
   }
   }

With above approach I am facing problems.

1) Doing grep in huge files can be time consuming. Is there any alternative, whithout having to grep on files?

2) My input plugin (attached above) is taking events from Elastic Search which has file_path set for "ALL" events in an index, this makes my_filter.sh to be executed multiple times which is something I want to avoid. How can I extract unique file_path from ES?

1条回答
Bombasti
2楼-- · 2019-09-16 02:04

Elasticsearch was not made to build output stream depending on input. Elastic is a noSQL database, where the data should be consumed over the time (in a real time approach). It means that you should first store everything in your Elasticsearch, and process the data after. In your case, you are stucking the flow by waiting different events.

If you really need to catch these events and process it in background, you could try something like nxlog before filtering in logstash (input is nxlog) or a python script (use it as a filter in logstash). In your case I would pre-process my data to consolidate it, and then send it to logstash

查看更多
登录 后发表回答