I am new to Apache Camel and I am having problems understanding how to implement a simple integration task:
- A REST service is calling a Spring Bean through Apache Camel route
- Spring Bean returns a collection of classes (ArrayList)
I need to iterate over each item in the collection and convert it to another type through the custom converter.
It seems that I should use Split and an Aggregator, but how do I constraint Aggregator to consume all items from the original list, not more, nor less. Also, how can I convert one item to another? Should I use a Type Converter?
Can someone provide me a simple example?
UPDATE 1
Sorry, I had to undo acceptance of the provided example since it is not actually answering to my question. Here is the use case qualification:
I need to split and transform a method return value from to("bean:someBean")
invocation rather than split and transform input values from some endpoint.
So the use case is
- Call some endpoint; e.g a GET request on rest service, in my case:
from("endpoint")
- Call a bean and get it's return value; like
List
, to("bean:someBean")
)
- Transform returned value to another
List
- Return transformed
List
to consumer
UPDATE 2
So, I can confirm than using end()
method does not solve my problem.
Here is the code:
rest("some/service").get().produces("application/json").to("bean:someBean?method=getListOfObjects").route().split(body(), (oldExchange, newExchange) -> {
List<ObjectToConvert> oldList = oldExchange.getIn(List.class);
List<NewObject> convertedList = taskList.stream().map(ObjectToConvert::new).collect(Collectors.toList());
newExchange.getOut().setBody(convertedList);
return newExchange;
}).end();
Using this kind of route I get the following exception on the application server:
19:30:21,126 ERROR [org.jboss.as.controller.management-operation] (XNIO-1 task-5) JBAS014613: Operation ("full-replace-deployment") failed - address: (undefined) - failure description: {"JBAS014671: Failed services" => {"jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel" => "org.jboss.msc.service.StartException in service jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel: Failed to start service
Caused by: java.lang.RuntimeException: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
Caused by: java.lang.IllegalArgumentException: Definition has no children on Split[{body} -> []]"}}
Here is a complete example that splits aggregates and converts a list message.
- The camel splitter provides a built-in aggregator which aggregates
all split messages in the original exchange. So the splitter is only
aggregating the messages of each list(exchange) send in "direct:start". You have to provide a custom
aggregation strategy because the default one will use the original
exchanges, in my example InOrder. The aggregation strategy is the
second argument of the split definition.
The type converter gives us the opportunity to use convertBodyTo in
DSL. You cound also achieve that with a bean,processor or do all the transformations in the custom aggregation strategy.
package org.mydemocamel.app;
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
import org.apache.camel.support.TypeConverterSupport;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.toolbox.FlexibleAggregationStrategy;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class CamelSplitAggregateConvertTest extends CamelTestSupport {
@Produce
private ProducerTemplate template;
@EndpointInject(uri = "mock:out")
private MockEndpoint mock;
@Test
public void testSplitAggregateConvertOrder(){
InOrder inOrder1 = new InOrder();
inOrder1.setId("1");
InOrder inOrder2 = new InOrder();
inOrder2.setId("2");
List<InOrder> inOrderList = new ArrayList<InOrder>();
inOrderList.add(inOrder1);
inOrderList.add(inOrder2);
template.sendBody("direct:start", inOrderList);
mock.expectedMessageCount(1);
Exchange outList = mock.getReceivedExchanges().get(0);
List<OutOrder> outOrderList = outList.getIn().getBody(List.class);
assertEquals(1, outOrderList.get(0).getId());
assertEquals(2, outOrderList.get(1).getId());
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
context.getTypeConverterRegistry().addTypeConverter(OutOrder.class, InOrder.class, new MyOrderTypeConverter());
from("direct:start")
.split(body(), new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List<OutOrder> orders = new ArrayList<OutOrder>();
OutOrder newOrder = newExchange.getIn().getBody(OutOrder.class);
orders.add(newOrder);
newExchange.getIn().setBody(orders);
return newExchange;
}
List<OutOrder> orders = oldExchange.getIn().getBody(List.class);
OutOrder newOutOrder = newExchange.getIn().getBody(OutOrder.class);
orders.add(newOutOrder);
oldExchange.getIn().setBody(orders);
return oldExchange;
}
})
.convertBodyTo(OutOrder.class)
.end() //splitter ends here and the exchange body is now List<OutOrder>
.to("mock:out");
}
};
}
private static class MyOrderTypeConverter extends TypeConverterSupport {
@SuppressWarnings("unchecked")
public <T> T convertTo(Class<T> type, Exchange exchange, Object value) {
// converter from inorder to outorder bean
OutOrder order = new OutOrder();
order.setId(Integer.parseInt(((InOrder) value).getId()));
return (T) order;
}
}
private static class OutOrder {
private int id;
public void setId(int id) {
this.id = id;
}
public int getId() {
return id;
}
}
private static class InOrder {
private String id;
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
}
}
you are right with splitter and aggregator, the "Split aggregate request/reply sample" from http://camel.apache.org/splitter.html shows what you need.
Do you need a "converted" list of objects after the splitter?
If yes, the point "Using a List in AggregationStrategy" from http://camel.apache.org/aggregator2.html looks right for your needs.
kind regards,
soilworker
For future reference, another way you can iterate over a list is using the loop
construct of the Java DSL. Here's an example:
from("direct:myDirect")
.loop(header("LIST_LENGTH")) // You will set this header in the processor before with the list lenght.
.process(new Processor(){
@Override
public void proocess(Exchange arg0){
MyObj currentElement = (MyObj) arg0.getIn().getBody(List.class).get(LOOP_INDEX);
// Do your processing here.
}
})
.end()
.end();
The LOOP_INDEX
property will contain the current iteration, starting from 0 up to the LIST_LENGHT header value, so you can use it to get the current element from the list.
Notice the double end()
method call: one is for ending the loop and the other one is to end the route.
Documentation: http://camel.apache.org/loop.html