Apache Camel : File to BeanIO and merge beanIO obj

2019-08-04 09:01发布

I have the usecase to read employee, address and contact files in parallel and convert that to beanIO object and merge the beanIO object to produce the complete employeeDetails object.

Emp File:

1 Foo Engineer
2 Bar AssistantEngineer

Emp Contact File:

1 8912345678  foo@org.com
2 7812345678    bar@org.com

Emp Address File:

 1 city1 1234
 2 city2 2345

Expected output in a EmployeeDetailsBeanIODataFormat object in Exchange:

1 Foo Engineer foo@org.com city1 1234
2 Bar AssistantEngineer bar@org.com city2 2345

I have the following routes

from("file://C:/cameltest/employee.txt").to("seda:beanIO");
from("file://C:/cameltest/employeeContact.txt").to("seda:beanIOContact");
from("file://C:/cameltest/employeeAddress.txt").to("seda:beanIOAddress");

Each file is converted to beanio object

BeanIODataFormat empFormat = new BeanIODataFormat("beanIO.xml","emp");
BeanIODataFormat empContactFormat = new BeanIODataFormat("beanIO.xml", "empContact");
BeanIODataFormat empAddressFormat = new BeanIODataFormat("beanIO.xml", "empAddress");

from("seda:beanIO").unmarshal(empFormat).log("body - ${body}");        
from("seda:beanIOContact").unmarshal(empContactFormat).log("Contact body ${body}");
from("seda:beanIO").unmarshal(empAddressFormat).log("Address body - ${body}");     

The output logs the bean objects correctly.

Now I need to merge the objects to form EmployeeDetails object. Can someone let me know how to do this? I have read and it seems like Aggregators can be used to do this job, but not sure with the approach.

Any idea on this with sample will be helpful. Suggestions are welcome, is it advisable to merge the files first based on employee id and create an object out of it? I dont want to write the merged file into disk in this case as IO will cost the performance.

Thanks in Advance.

1条回答
贪生不怕死
2楼-- · 2019-08-04 09:18

Use a splitter to split each message after the unmarshalling

from("seda:beanIO").unmarshal(empFormat).split(body()).to("seda:aggregate");
from("seda:beanIOContact").unmarshal(empContactFormat).split(body()).to("seda:aggregate");
from("seda:beanIOAddress").unmarshal(empAddressFormat).split(body()).to("seda:aggregate");

And then here is how an aggregator could look like. The details object is stored as a header in the olddExchange. The most important parameters are the following

  1. correlationExpression: simple("${body.id}") Correlate all the messages with the same id(1 or 2)
  2. completionSize=3. One for each file.

from("seda:aggregate").aggregate(simple("${body.id}"), (oldExchange,newExchange) -> {
        if (oldExchange == null) {
            EmployeeDetails details = buildDetails(new EmployeeDetails(), newExchange);
            newExchange.getIn().setHeader("details", details);
            return newExchange;
        }
        EmployeeDetails details = oldExchange.getIn().getHeader("details", EmployeeDetails.class);
        buildDetails(details, newExchange);
        oldExchange.getIn().setHeader("details", details);
        return oldExchange;
    }).completionSize(3).log("Details - ${header.details}")

And

private EmployeeDetails buildDetails(EmployeeDetails details, Exchange newExchange) {
        Object newBody = newExchange.getIn().getBody();
        if (newBody instanceof Employee) {
            details.setId(((Employee) newBody).getId());
            details.setName(((Employee) newBody).getName());
            details.setJob(((Employee) newBody).getJob());
        } else if (newBody instanceof EmployeeContact) {
            details.setEmail(((EmployeeContact) newBody).getEmail());
        } else if (newBody instanceof EmployeeAddress) {
            details.setCity(((EmployeeAddress) newBody).getCity());
            details.setCode(((EmployeeAddress) newBody).getCode());
        }
        return details;
    }

Then the outcome would be 2 details objects

Details - EmployeeDetails(id=1, name=Foo, job=Engineer, email=foo@org.com, city=city1, code=1234)
Details - EmployeeDetails(id=2, name=Bar, job=AssistantEnginee, email=bar@org.com, city=city2, code=2345)
查看更多
登录 后发表回答