Logstash -> Elasticsearch - update denormalized da

2019-04-10 15:31发布

问题:

Use case explanation

We have a relational database with data about our day-to-day operations. The goal is to allow users to search the important data with a full-text search engine. The data is normalized and thus not in the best form to make full-text queries, so the idea was to denormalize a subset of the data and copy it in real-time to Elasticsearch, which allows us to create a fast and accurate search application.

We already have a system in place that enables Event Sourcing of our database operations (inserts, updates, deletes). The events only contains the changed columns and primary keys (on an update we don't get the whole row). Logstash already gets notified for each event so this part is already handled.


Actual problem

Now we are getting to our problem. Since the plan is to denormalize our data we will have to make sure updates on parent objects are propagated to the denormalized child objects in Elasticsearch. How can we configure logstash to do this?

Example

Lets say we maintain a list of Employees in Elasticsearch. Each Employee is assigned to a Company. Since the data is denormalized (for the purpose of faster search), each Employee also carries the name and address of the Company. An update changes the name of a Company - how can we configure logstash to update the company name in all Employees, assigned to the Company?


Additional explanation

@Darth_Vader: The problem we are facing is, that we get an event that a Company has changed, but we want to modify documents of type Employee in Elasticsearch, because they carry the data about the company in itself. Your answer expects that we will get an event for every Employee, which is not the case.

Maybe this will make it clearer. We have 3 employees in Elasticsearch:

{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}

Then an update happens in the source DB.

UPDATE company SET name = 'Company NEW' WHERE cmp_id = 1;

We get an event in logstash, where it says something like this:

{type:'company',cmp_id:'1',old.name:'Company A',new.name:'Company NEW'}

This should then be propagated to Elasticsearch, so that the resulting employees are:

{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}

Notice that the field company.name changed.

回答1:

I suggest a similar solution to what I've posted here, i.e. to use the http output plugin in order to issue an update by query call to the Employee index. The query would need to look like this:

POST employees/_update_by_query
{
  "script": {
    "source": "ctx._source.company.name = params.name",
    "lang": "painless",
    "params": {
      "name": "Company NEW"
    }
  },
  "query": {
    "term": {
      "company.cmp_id": "1"
    }
  }
}

So your Logstash config should look like this:

input {
  ... 
}
filter {
  mutate {
    add_field => {
      "[script][lang]" => "painless"
      "[script][source]" => "ctx._source.company.name = params.name"
      "[script][params][name]" => "%{new.name}"
      "[query][term][company.cmp_id]" => "%{cmp_id}"
    }
    remove_field => ["host", "@version", "@timestamp", "type", "cmp_id", "old.name", "new.name"]
  }
}
output {
  http {
    url => "http://localhost:9200/employees/_update_by_query"
    http_method => "post"
    format => "json"
  }
}