I'm working on a project that involves 2 queue's, and multiples Listeners interacting with them. Flow:
- New HTTP request comes to the server, then it's converted into a Object that will be the message
- This message has to be published in two queues
- I have two types of Listeners that get messages from each queue and then I do whatever I want
I've been reading and the best way to do is with a fanout-exchange. Here is my code:
listener-configuration.xml
<!-- CREATE CONNECTION FACTORY -->
<rabbit:connection-factory id="connectionFactory"
host="localhost" username="guest" password="guest" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- <!-- RABBIT QUEUE'S -->
<rabbit:queue id="trashroute.rabbit.queue" name="trashroute.rabbit.queue" auto-delete="false" auto-startup=false
durable="true" />
<!-- Webapp Queue -->
<rabbit:queue id="trashroute2.rabbit.queue" name="trashroute2.rabbit.queue" auto-delete="false" auto-startup=false
durable="true" />
<!-- CREATE AN EXCHANGE AND BIND THE QUEUE WITH MY.ROUTINGKEY.* TO THE EXCHANGE -->
<rabbit:fanout-exchange id="myExchange" name="trashroute-exchange">
<rabbit:bindings>
<rabbit:binding queue="trashroute.rabbit.queue"></rabbit:binding>
<rabbit:binding queue="trashroute2.rabbit.queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- CREATE THE RABBIT TEMPLATES -->
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute.rabbit.queue"/>
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute2.rabbit.queue"/>
<!-- INSTANTIATE THE LISTENERS -->
<bean id="persistenceListener" class="trashroute.rabbitmq.listener.PersistenceListener" />
<bean id="webappListener" class="trashroute.rabbitmq.listener.WebappListener" />
<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />
<!-- GLUE THE LISTENER AND QUEUE TO THE LISTENER CONTAINER -->
<rabbit:listener-container id="listenerContainer"
connection-factory="connectionFactory" message-converter="jsonMessageConverter">
<rabbit:listener ref="persistenceListener" queues="trashroute.rabbit.queue" />
<rabbit:listener ref="webappListener" queues="trashroute2.rabbit.queue" />
</rabbit:listener-container>
sender-configuration.xml
<!-- First following line creates a rabbit connection factory with specified parameters -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" />
<!-- Obtain admin rights to create an exchange -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- Create a bean which can send message to trashroute-exchange for the Java program to call -->
<rabbit:template id="template" connection-factory="connectionFactory" exchange="myExchange"
message-converter="jsonMessageConverter" />
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.JsonMessageConverter"/>
</property>
Listener MainConfiguration.java
@Configuration
public class MainConfiguration {
protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public DataController DataController(){
return new DataController();
}
@Bean
// Every queue is bound to the default direct exchange
public Queue persistenceQueue() {
//Create a new queue with an specific name and the durability value in true.
return new Queue(this.persistenceQueue, true);
}
@Bean
public Queue webappQueue() {
//Create a new queue with an specific name and the durability value in true.
return new Queue(this.webappQueue, true);
}
}
Sender MainConfiguration.java
@Configuration
public class SenderConfiguration {
protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";
//Create the Template
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new JsonMessageConverter());
return template;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
"localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public IServiceManager scheduledProducer() {
return new ServiceManagerImpl();
}
@Bean
public BeanPostProcessor postProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
Can anyone please tell me what I'm doing wrong? One of the two Listeners, works perfectly, the second never reads a message.