Processing multiple message in parallel with Activ

2019-04-12 11:59发布

I'd like to process messages in a queue in parallel using a simple Processor/AsyncProcessor as a destination. The processor takes a little time per message, but each message can be handled seperately, and thus at the same time (within healthy boundaries).

I'm having a hard time finding examples, especially about the xml configuration of camel routes.

So far, I've defined a threadpool, route and processor:

<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/>
<route>
    <from uri="broker:queue:inbox" />
    <threads executorServiceRef="smallPool">
        <to uri="MyProcessor" />
    </threads>
</route>
<bean id="MyProcessor" class="com.example.java.MyProcessor" />

and my processor looks like:

public class MyProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Message in = exchange.getIn();
        String msg = in.getBody(String.class);      
        System.out.println(msg);
        try {
            Thread.sleep(10 * 1000); // Do something in the background
        } catch (InterruptedException e) {}
        System.out.println("Done!");
    }
}

Unfortunatly, when I post messages to the queue, they are still processed one by one, each delayed by 10 seconds (my "background task").

Can anyone point me to the right direction to have the messages processed using the defined threadpool or explain what I am doing wrong?

1条回答
男人必须洒脱
2楼-- · 2019-04-12 12:33

You should use the concurrentConsumers options as said in the comments,

<route>
    <from uri="broker:queue:inbox?concurrentConsumers=5" />
    <to uri="MyProcessor" />
</route>

Notice there is also maxConcurrentConsumers you can set to use a min/max range of concurrent consumers, so Camel will automatic grow/shrink depending on load.

See more details in the JMS docs at

查看更多
登录 后发表回答