Spring AMQP Project with 2 queue's

2020-03-01 02:39发布

问题:

I'm working on a project that involves 2 queue's, and multiples Listeners interacting with them. Flow:

  • New HTTP request comes to the server, then it's converted into a Object that will be the message
  • This message has to be published in two queues
  • I have two types of Listeners that get messages from each queue and then I do whatever I want

I've been reading and the best way to do is with a fanout-exchange. Here is my code:

listener-configuration.xml

<!-- CREATE CONNECTION FACTORY -->
<rabbit:connection-factory id="connectionFactory"
    host="localhost" username="guest" password="guest" />

<rabbit:admin connection-factory="connectionFactory" />

<!-- <!-- RABBIT QUEUE'S -->
<rabbit:queue id="trashroute.rabbit.queue" name="trashroute.rabbit.queue" auto-delete="false" auto-startup=false
    durable="true" />
<!-- Webapp Queue -->
<rabbit:queue id="trashroute2.rabbit.queue" name="trashroute2.rabbit.queue" auto-delete="false" auto-startup=false
    durable="true" /> 

<!-- CREATE AN EXCHANGE AND BIND THE QUEUE WITH MY.ROUTINGKEY.* TO THE EXCHANGE -->
<rabbit:fanout-exchange id="myExchange" name="trashroute-exchange">
    <rabbit:bindings>
        <rabbit:binding queue="trashroute.rabbit.queue"></rabbit:binding>
        <rabbit:binding queue="trashroute2.rabbit.queue"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- CREATE THE RABBIT TEMPLATES -->
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute.rabbit.queue"/>
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute2.rabbit.queue"/>

<!-- INSTANTIATE THE LISTENERS -->
<bean id="persistenceListener" class="trashroute.rabbitmq.listener.PersistenceListener" />
<bean id="webappListener" class="trashroute.rabbitmq.listener.WebappListener" />

<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />

<!-- GLUE THE LISTENER AND QUEUE TO THE LISTENER CONTAINER -->
<rabbit:listener-container id="listenerContainer"
    connection-factory="connectionFactory" message-converter="jsonMessageConverter">
    <rabbit:listener ref="persistenceListener" queues="trashroute.rabbit.queue" />
    <rabbit:listener ref="webappListener" queues="trashroute2.rabbit.queue" />
</rabbit:listener-container>

sender-configuration.xml

<!--  First following line creates a rabbit connection factory with specified parameters -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" />

<!-- Obtain admin rights to create an exchange -->
<rabbit:admin connection-factory="connectionFactory" />

<!-- Create a bean which can send message to trashroute-exchange for the Java program to call -->
<rabbit:template id="template" connection-factory="connectionFactory"  exchange="myExchange"
message-converter="jsonMessageConverter" />


<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
    <bean class="org.springframework.amqp.support.converter.JsonMessageConverter"/>
</property>

Listener MainConfiguration.java

@Configuration
public class MainConfiguration {

protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public DataController DataController(){
    return new DataController();
}

@Bean
// Every queue is bound to the default direct exchange
public Queue persistenceQueue() { 
    //Create a new queue with an specific name and the durability value in true.
    return new Queue(this.persistenceQueue, true);
}

@Bean
public Queue webappQueue() {
    //Create a new queue with an specific name and the durability value in true.
    return new Queue(this.webappQueue, true);
}
}

Sender MainConfiguration.java

@Configuration
public class SenderConfiguration {

protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";

//Create the Template
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setMessageConverter(new JsonMessageConverter());
    return template;
}

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
            "localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

@Bean
public IServiceManager scheduledProducer() {
    return new ServiceManagerImpl();
}

@Bean
public BeanPostProcessor postProcessor() {
    return new ScheduledAnnotationBeanPostProcessor();
}

}

Can anyone please tell me what I'm doing wrong? One of the two Listeners, works perfectly, the second never reads a message.

回答1:

Based on the scenario explained above I have tried to create a sample application which uses Spring Java Config.

Messages are published to trashroute and webapp queues, and respective receivers (persistence and webapp) receive the messages.

RabbitConfiguration.java (Contains configuration for both Sender and Receiver)

@Configuration
@EnableRabbit
public class RabbitConfiguration {

    public static final String BROADCAST_TRASHROUTE_QUEUE = "trashroute.rabbit.queue";
    public static final String BROADCAST_WEBAPP_QUEUE = "webapp.rabbit.queue";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        return connectionFactory;
    }


    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public Queue trashRouteQueue() {
        return new Queue(BROADCAST_TRASHROUTE_QUEUE);
    }

    @Bean
    public Queue webAppQueue() {
        return new Queue(BROADCAST_WEBAPP_QUEUE);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    @Bean
    public FanoutExchange trashRouteExchange() {
        FanoutExchange exchange = new FanoutExchange("trashroute");
        return exchange;
    }

    @Bean
    public Binding trashRouteBinding() {
        return BindingBuilder.bind(trashRouteQueue()).to(trashRouteExchange());
    }

    @Bean
    public Binding webAppBinding() {
        return BindingBuilder.bind(webAppQueue()).to(trashRouteExchange());
    }

    @Bean
    SimpleMessageListenerContainer persistenceListenerContainer(ConnectionFactory connectionFactory, @Qualifier("persistenceListenerAdapter") MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(trashRouteQueue(), webAppQueue());
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter persistenceListenerAdapter(PersistenceListener receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    SimpleMessageListenerContainer webAppListenerContainer(ConnectionFactory connectionFactory, @Qualifier("webAppListenerAdapter") MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(trashRouteQueue(), webAppQueue());
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter webAppListenerAdapter(WebAppListener webAppListener) {
        return new MessageListenerAdapter(webAppListener, "receiveMessage");
    }

    @Bean
    PersistenceListener persistenceListener() {
        return new PersistenceListener();
    }

    @Bean
    WebAppListener webAppListener() {
        return new WebAppListener();
    }

}

PersistenceListener.java

public class PersistenceListener {

    public void receiveMessage(String message) {
        System.out.println("Persistence Listener: Messsage Received <" + message + ">");
    }
}

WebAppListener.java

public class WebAppListener {
    public void receiveMessage(String message) {
        System.out.println("WebAppListener: Message Received <" + message + ">");
    }
}

Application.java

@SpringBootApplication
public class Application implements CommandLineRunner {

    @Autowired
    AnnotationConfigApplicationContext context;

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Waiting five seconds...");
        Thread.sleep(5000);
        System.out.println("Sending message...");

        RabbitTemplate rabbitTemplate = (RabbitTemplate) context.getBean("rabbitTemplate");

        rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_TRASHROUTE_QUEUE, "Hello from trashroute queue!");
        rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_WEBAPP_QUEUE, "Hello from webapp queue!");

        Thread.sleep(10000);
        context.close();
    }
}

Hope this will help. Although you would need to refactor the code if you want to use this in production.