Camel Splitter Parallel Processing Array List - Co

2019-04-17 03:05发布

问题:

Using Camel to split an ArrayList and process each item in parallel up to 10 threads. Following is the config. Thread pool profile is set to max thread count =10.

<camel:route id="ReportsRoute">
        <camel:from uri="direct:processReportsChannel" />
        <camel:to uri="bean:reportRepository?method=getPendingTransactions" />
        <camel:split parallelProcessing="true" executorServiceRef="ReportThreadPoolProfile">
            <camel:simple>${body}</camel:simple>
            <camel:doTry>
                <camel:to uri="direct:processReportChannel" />
                <camel:doCatch>
                    <camel:exception>java.lang.Exception</camel:exception>
                    <camel:handled>
                        <camel:constant>true</camel:constant>
                    </camel:handled>                        
                    <camel:to uri="bean:ReportRepository?method=markAsFailed"/>
                    <camel:wireTap uri="direct:loggingAndNotificationChannel" />
                </camel:doCatch>
            </camel:doTry>
        </camel:split>
    </camel:route>  

bean:reportRepository?method=getPendingTransactions gets the ArrayList and passes to the Splitter.

processReportChannelis the processor that handles items.

Problem: It is starting 10 threads when the job starts, but some threads are picking up the same item. For example, if I have item_no_1 through to 10 in the ArrayList, thread_no_1 and thread_no_2 or sometime more threads are picking up let's say item_no_2. Is it because Array List is not thread safe and Splitter doesn't manage that?

I'm not an expert in this and need help to point out where the issue is.

回答1:

I tested with following (simpler) setup:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="ReportsRoute">
        <from uri="direct:start" />
         <!-- By default a pool size of 10 is used. -->
        <split parallelProcessing="true">
            <simple>${body}</simple>
            <to uri="direct:sub" />
        </split>
    </route>  
    <route>
        <from uri="direct:sub"/>
        <log message="Processing item ${body}" />
    </route>
</camelContext>

Testing:

 List<Object> list = new ArrayList<>();
 for (int i = 0; i < 1000; i++) {
    list.add("And we go and go: " + (i + 1));
 }
 template.sendBody("direct:start", list);

With this setup no entry was processed twice. So there must be something in your processors that lead to the issue, that the same list item is picked up by more than one thread.