I'm new to spring integration and am confused about how to send error messages to a designated error queue. I want the error message to be a header on the original message and end up in a separate queue. I read that this can be done with a header enricher, which I tried to implement but nothing is showing up in the error queue.
Also, do I need a separate exception handling class in order for the error messages to make it to the error queue or can I just throw exceptions in my transforming methods?
Here is my xml config:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="first" auto-delete="false" durable="true" />
<rabbit:queue name="second" auto-delete="false" durable="true" />
<rabbit:queue name="errorQueue" auto-delete="false" durable="true" />
<int:poller default="true" fixed-rate="100"/>
<rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="second" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="errorQueue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange" amqp-template="amqpTemplate" />
<int-amqp:inbound-channel-adapter channel="messageInputChannel" error-channel="errorInputChannel" queue-names="first" connection-factory="connectionFactory" concurrent-consumers="20" />
<int-amqp:outbound-channel-adapter channel="errorOutputChannel" exchange-name="error-exchange" amqp-template="amqpTemplate" />
<int:channel id="messageInputChannel" />
<int:channel id="messageOutputChannel"/>
<int:channel id="errorInputChannel"/>
<int:service-activator input-channel="errorInputChannel" output-channel= "errorOutputChannel" method = "handleError" >
<bean class="firstAttempt.MessageErrorHandler"/>
<int:chain input-channel="messageInputChannel" output-channel="messageOutputChannel">
<int:header-enricher>
<int:error-channel ref="errorInputChannel" />
</int:header-enricher>
<int:transformer method = "convert" >
<bean class="firstAttempt.JsonObjectConverter" />
</int:transformer>
<int:service-activator method="transform">
<bean class="firstAttempt.Transformer" />
</int:service-activator>
<int:object-to-string-transformer />
</int:chain>
</beans>
Error Class:
public class ErrorHandler {
public String errorHandle(MessageHandlingException exception) {
return exception.getMessage();
QualityScorer class (called by transformer):
public class QualityScorer {
private Hashtable<String, String> table;
private final static String csvFile = "C:\\Users\\john\\Test.csv";
public QualityScorer() throws Exception {
table = new Hashtable<String, String>();
initializeTable();
}
private void initializeTable() throws Exception {
BufferedReader br = null;
String line = "";
String cvsSplitBy = ",";
try {
br = new BufferedReader(new FileReader(csvFile));
while ((line = br.readLine()) != null) {
String[] data = line.split(cvsSplitBy);
if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1"))
table.putIfAbsent(data[3], data[1]);
}
} catch (FileNotFoundException e) {
throw new Exception("No file found");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public float getScore(JSONObject object) throws Exception {
float score;
if (object == null) {
throw new IllegalArgumentException("object");
}
if (!object.has("source")) {
throw new Exception("Object does not have a source");
}
if (!object.has("employer")) {
throw new Exception("Object does not have an employer");
}
String source = object.getString("Source");
String employer = object.getString("employer");
if (table.containsKey(employer) && !source.equals("packageOne")) {
score = 1;
} else {
score = -1;
}
return score;
}
}
Right now, the message being loaded has no source, so the program should be throwing the MessagingException to the MessageErrorHandler.
Transformer code:
public class Transformer {
private QualityScorer qualityScorer;
public Transformer() throws Exception {
qualityScorer = new QualityScorer();
}
public JSONObject transform(JSONObject object) throws Exception {
float score = qualityScorer.getScore(object);
object.put("score", score);
return object;
}
}
All together, the program should receive a pre-loaded message from a queue, transform it and send it on to a second queue, which it does successfully if the source is provided in the pre-loaded message. I'm trying to handle errors and make it so they are sent to an error queue as a message header. This issue has been frustrating me for awhile, so help is greatly appreciated!
The error currently being shown in the stacktrace is:
java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.lang.Thread.run(Thread.java:748)
But nothing is going to the error queue.
When the exception is thrown, it is wrapped together with the
requestMessage
to theMessagingException
. Your own business exception is in thecause
and you can get access to therequestMessage
from theMessagingException.failedMessage
property.So, it looks like you have everything you need for your use-case. Only the problem that before sending to the
error-exchange
you really should have some<transformer>
in the error flow to properly convert thatMessagingException
to the proper message to send to the AMQP.