There is a middleware
in between of two other software components. In the middleware
I'm routing Apache ActiveMQ
messages by Apache Camel
.
this is how it works:
1stComponent
usesmiddleware
to send message to the3rdComponent
3rdComponent
replies the message and sends it back to the1st
(usingmiddleware
).1stComponent <<=>> Middleware <<=>> 3rdComponent
Problem:
I'm using ConcurrentConsumers
in middleware.
In the middle of sending a lot of messages sequentially, suddenly middleware
stops all the process!
there is no exceptions or messages!
for example, first 100 of 500 messages got processed and the remainders remain in the queue as pending messages.
this warning is logged sometimes in the middle of the process:
[WARN ] TemporaryQueueReplyManager(Camel (camel-1) thread #11 - TemporaryQueueReplyManager[Q.MyQ]):91 - Reply received for unknown correlationID [c551c7aa061f501c]. The message will be ignored: ActiveMQMapMessage {commandId = 2161, responseRequired = true, messageId = ID:xxxxxxx, originalDestination = null, originalTransactionId = null, producerId = ID:xxxxxxx, destination = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, transactionId = null, expiration = 0, timestamp = 1389890272360, arrival = 0, brokerInTime = 1389890272360, brokerOutTime = 1389890272360, correlationId = c551c7aa061f501c, replyTo = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@19e19da, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:xxxxxxxxxxxxxx, Title=300, CamelJmsDeliveryMode=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} }
this is the middlewares
Code:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("activemq:queue:Q.Middleware?concurrentConsumers=1&maxConcurrentConsumers=10")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
.inOut("activemq2:queue:Q.3RD")
;
}
}
and the 3rdComponent
:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
from("activemq:queue:Q.3RD")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
;
}
}
Nobody knows why! but I found this solution:
It's possible to simulate this feature by using multiple routes like this:
It is working fine but managing it's number of consumers(Routes) is not appropriate! you should copy & paste routes!
@Amin Ralf is right adding more to his reply--- there may be two reasons the process stopped 1 - TTL expired 2 - Time between client and server is out of sync.
If your issue is related to 1 then set header - "JMSExpiration"
If your issue is related to 2 - (copy pasted from another stackoverflow post)
then the clocks between client - broker need to be in sync, for the expiry to work properly. If the clocks is not in sync, then the expiry set from the client, may already be expired when the message is received on the broker. Or the client time is ahead of the broker, so the expiry is longer than the 10 secs.
To fix this by realigning the time, to be broker based only. See http://activemq.apache.org/timestampplugin.html
When you're using the request/response paradigm (which you're using since you have an InOut in your route), a producer expects to get a responses to each message it sends before it sends the next one. If two producers produce messages that will come to the same response destination and they're watching that destination with the same selector, then it's a race condition whether a message comes to the producer that's expecting it or the other one, and if it goes to the other one, you'll get the message you're seeing in the producer that got the unexpected reply, and a message indicating a timeout waiting for a response in the producer that sent the original message.
That's why your "answer" (which doesn't actually answer your question, only describe how to reproduce your problem reliably) is able to reproduce the problem by starting multiple parallel routes that send InOut messages to the same destination.
To fix the problem, you need to ensure that your producers will get only their own messages, either via separate reply destinations for each producer, via unique selectors for each producer, or by reducing the number of producers.
UPDATE:
My previous answer is working correctly, but was not the complete solution.
The mistake was producing non-unique
CorrelationIDs
! (a bug in random string generators) simple!!! :|