There is a middleware
in between of two other software components. In the middleware
I'm routing Apache ActiveMQ
messages by Apache Camel
.
this is how it works:
1stComponent
uses middleware
to send message to the 3rdComponent
3rdComponent
replies the message and sends it back to the 1st
(using middleware
).
1stComponent <<=>> Middleware <<=>> 3rdComponent
Problem:
I'm using ConcurrentConsumers
in middleware.
In the middle of sending a lot of messages sequentially, suddenly middleware
stops all the process!
there is no exceptions or messages!
for example, first 100 of 500 messages got processed and the remainders remain in the queue as pending messages.
this warning is logged sometimes in the middle of the process:
[WARN ] TemporaryQueueReplyManager(Camel (camel-1) thread #11 - TemporaryQueueReplyManager[Q.MyQ]):91 - Reply received for unknown correlationID [c551c7aa061f501c]. The message will be ignored: ActiveMQMapMessage {commandId = 2161, responseRequired = true, messageId = ID:xxxxxxx, originalDestination = null, originalTransactionId = null, producerId = ID:xxxxxxx, destination = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, transactionId = null, expiration = 0, timestamp = 1389890272360, arrival = 0, brokerInTime = 1389890272360, brokerOutTime = 1389890272360, correlationId = c551c7aa061f501c, replyTo = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@19e19da, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:xxxxxxxxxxxxxx, Title=300, CamelJmsDeliveryMode=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} }
this is the middlewares
Code:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("activemq:queue:Q.Middleware?concurrentConsumers=1&maxConcurrentConsumers=10")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
.inOut("activemq2:queue:Q.3RD")
;
}
}
and the 3rdComponent
:
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
from("activemq:queue:Q.3RD")
.threads(1, 100)
.process(new Processor() {
public void process(Exchange exchange) {
//some code
}
})
;
}
}
When you're using the request/response paradigm (which you're using since you have an InOut in your route), a producer expects to get a responses to each message it sends before it sends the next one. If two producers produce messages that will come to the same response destination and they're watching that destination with the same selector, then it's a race condition whether a message comes to the producer that's expecting it or the other one, and if it goes to the other one, you'll get the message you're seeing in the producer that got the unexpected reply, and a message indicating a timeout waiting for a response in the producer that sent the original message.
That's why your "answer" (which doesn't actually answer your question, only describe how to reproduce your problem reliably) is able to reproduce the problem by starting multiple parallel routes that send InOut messages to the same destination.
To fix the problem, you need to ensure that your producers will get only their own messages, either via separate reply destinations for each producer, via unique selectors for each producer, or by reducing the number of producers.
@Amin Ralf is right adding more to his reply--- there may be two reasons the process stopped
1 - TTL expired
2 - Time between client and server is out of sync.
If your issue is related to 1 then set header - "JMSExpiration"
If your issue is related to 2 - (copy pasted from another stackoverflow post)
then the clocks between client - broker need to be in sync, for the expiry to work properly. If the clocks is not in sync, then the expiry set from the client, may already be expired when the message is received on the broker. Or the client time is ahead of the broker, so the expiry is longer than the 10 secs.
To fix this by realigning the time, to be broker based only. See http://activemq.apache.org/timestampplugin.html
UPDATE:
My previous answer is working correctly, but was not the complete solution.
The mistake was producing non-unique CorrelationIDs
! (a bug in random string generators) simple!!! :|
Nobody knows why! but I found this solution:
It's possible to simulate this feature by using multiple routes like this:
...
// ----<1st Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<2nd Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<3rd Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<4th Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<5th Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<6th Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<7th Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<8th Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<9th Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
// ----<10th Route>----
from("activemq:queue:Q.Middleware").threads(2, 20).inOut("activemq2:queue:Q.3RD");
...
It is working fine but managing it's number of consumers(Routes) is not appropriate! you should copy & paste routes!