Apache Camel iterate over List

2019-08-03 01:49发布

I am new to Apache Camel and I am having problems understanding how to implement a simple integration task:

  1. A REST service is calling a Spring Bean through Apache Camel route
  2. Spring Bean returns a collection of classes (ArrayList)

I need to iterate over each item in the collection and convert it to another type through the custom converter.

It seems that I should use Split and an Aggregator, but how do I constraint Aggregator to consume all items from the original list, not more, nor less. Also, how can I convert one item to another? Should I use a Type Converter?

Can someone provide me a simple example?

UPDATE 1

Sorry, I had to undo acceptance of the provided example since it is not actually answering to my question. Here is the use case qualification: I need to split and transform a method return value from to("bean:someBean") invocation rather than split and transform input values from some endpoint.

So the use case is

  1. Call some endpoint; e.g a GET request on rest service, in my case: from("endpoint")
  2. Call a bean and get it's return value; like List, to("bean:someBean"))
  3. Transform returned value to another List
  4. Return transformed List to consumer

UPDATE 2

So, I can confirm than using end() method does not solve my problem.

Here is the code:

rest("some/service").get().produces("application/json").to("bean:someBean?method=getListOfObjects").route().split(body(), (oldExchange, newExchange) -> {
                List<ObjectToConvert> oldList = oldExchange.getIn(List.class);
                List<NewObject> convertedList = taskList.stream().map(ObjectToConvert::new).collect(Collectors.toList());
                newExchange.getOut().setBody(convertedList);

                return newExchange;
            }).end();

Using this kind of route I get the following exception on the application server:

19:30:21,126 ERROR [org.jboss.as.controller.management-operation] (XNIO-1 task-5) JBAS014613: Operation ("full-replace-deployment") failed - address: (undefined) - failure description: {"JBAS014671: Failed services" => {"jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel" => "org.jboss.msc.service.StartException in service jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel: Failed to start service
    Caused by: java.lang.RuntimeException: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
    Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
    Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
    Caused by: java.lang.IllegalArgumentException: Definition has no children on Split[{body} -> []]"}}

3条回答
beautiful°
2楼-- · 2019-08-03 01:50

Here is a complete example that splits aggregates and converts a list message.

  1. The camel splitter provides a built-in aggregator which aggregates all split messages in the original exchange. So the splitter is only aggregating the messages of each list(exchange) send in "direct:start". You have to provide a custom aggregation strategy because the default one will use the original exchanges, in my example InOrder. The aggregation strategy is the second argument of the split definition.
  2. The type converter gives us the opportunity to use convertBodyTo in DSL. You cound also achieve that with a bean,processor or do all the transformations in the custom aggregation strategy.

    package org.mydemocamel.app;
    import org.apache.camel.*;
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.component.mock.MockEndpoint;
    import org.apache.camel.processor.aggregate.AggregationStrategy;
    import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
    import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
    import org.apache.camel.support.TypeConverterSupport;
    import org.apache.camel.test.junit4.CamelTestSupport;
    import org.apache.camel.util.toolbox.FlexibleAggregationStrategy;
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class CamelSplitAggregateConvertTest extends CamelTestSupport {
    
        @Produce
        private ProducerTemplate template;
    
        @EndpointInject(uri = "mock:out")
        private MockEndpoint mock;
    
        @Test
        public void testSplitAggregateConvertOrder(){
            InOrder inOrder1 = new InOrder();
            inOrder1.setId("1");
    
            InOrder inOrder2 = new InOrder();
            inOrder2.setId("2");
    
            List<InOrder> inOrderList = new ArrayList<InOrder>();
            inOrderList.add(inOrder1);
            inOrderList.add(inOrder2);
    
            template.sendBody("direct:start", inOrderList);
    
            mock.expectedMessageCount(1);
            Exchange outList = mock.getReceivedExchanges().get(0);
            List<OutOrder> outOrderList = outList.getIn().getBody(List.class);
    
            assertEquals(1, outOrderList.get(0).getId());
            assertEquals(2, outOrderList.get(1).getId());
    
    
        }
    
        @Override
        protected RouteBuilder createRouteBuilder() throws Exception {
            return new RouteBuilder() {
                @Override
                public void configure() throws Exception {
    
                    context.getTypeConverterRegistry().addTypeConverter(OutOrder.class, InOrder.class, new MyOrderTypeConverter());
    
                    from("direct:start")
                    .split(body(), new AggregationStrategy() {
                        @Override
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                List<OutOrder> orders = new ArrayList<OutOrder>();
                                OutOrder newOrder = newExchange.getIn().getBody(OutOrder.class);
                                orders.add(newOrder);
                                newExchange.getIn().setBody(orders);
                                return newExchange;
                            }
                            List<OutOrder> orders = oldExchange.getIn().getBody(List.class);
                            OutOrder newOutOrder = newExchange.getIn().getBody(OutOrder.class);
                            orders.add(newOutOrder);
                            oldExchange.getIn().setBody(orders);
                            return oldExchange;
                        }
                    })
                    .convertBodyTo(OutOrder.class)
                    .end()  //splitter ends here and the exchange body  is now List<OutOrder> 
                    .to("mock:out");
    
                }
            };
        }
    
        private static class MyOrderTypeConverter extends TypeConverterSupport {
    
            @SuppressWarnings("unchecked")
            public <T> T convertTo(Class<T> type, Exchange exchange, Object value) {
                // converter from inorder to outorder bean
                OutOrder order = new OutOrder();
                order.setId(Integer.parseInt(((InOrder) value).getId()));
                return (T) order;
            }
        }
    
        private static class OutOrder {
            private int id;
    
            public void setId(int id) {
                this.id = id;
            }
    
            public int getId() {
                return id;
            }
        }
    
        private static class InOrder {
            private String id;
    
            public void setId(String id) {
                this.id = id;
            }
    
            public String getId() {
                return id;
            }
        }
    }
    
查看更多
放我归山
3楼-- · 2019-08-03 01:58

you are right with splitter and aggregator, the "Split aggregate request/reply sample" from http://camel.apache.org/splitter.html shows what you need.

Do you need a "converted" list of objects after the splitter? If yes, the point "Using a List in AggregationStrategy" from http://camel.apache.org/aggregator2.html looks right for your needs.

kind regards, soilworker

查看更多
【Aperson】
4楼-- · 2019-08-03 02:02

For future reference, another way you can iterate over a list is using the loop construct of the Java DSL. Here's an example:

from("direct:myDirect")
.loop(header("LIST_LENGTH")) // You will set this header in the processor before with the list lenght.
.process(new Processor(){

      @Override
      public void proocess(Exchange arg0){
          MyObj currentElement = (MyObj) arg0.getIn().getBody(List.class).get(LOOP_INDEX);
          // Do your processing here.
      }
})
.end()
.end();

The LOOP_INDEX property will contain the current iteration, starting from 0 up to the LIST_LENGHT header value, so you can use it to get the current element from the list.

Notice the double end() method call: one is for ending the loop and the other one is to end the route.

Documentation: http://camel.apache.org/loop.html

查看更多
登录 后发表回答