Spring RabbitMQ - using manual channel acknowledge

2020-01-30 06:43发布

问题:

How to acknowledge the messages manually without using auto acknowledgement. Is there a way to use this along with the @RabbitListener and @EnableRabbit style of configuration. Most of the documentation tells us to use SimpleMessageListenerContainer along with ChannelAwareMessageListener. However using that we lose the flexibility that is provided with the annotations. I have configured my service as below :

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}

My RabbitConfiguration is as below

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}

Any help will be appreciated on how to adapt manual channel acknowledgement along with the above style of configuration. If we implement the ChannelAwareMessageListener then the onMessage signature will change. Can we implement ChannelAwareMessageListener on a service ?

回答1:

Add the Channel to the @RabbitListener method...

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

and use the tag in the basicAck, basicReject.

EDIT

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

application.properties:

spring.rabbitmq.listener.acknowledge-mode=manual


回答2:

Just in case you need to use #onMessage() from ChannelAwareMessageListener class. Then you can do it this way.

@Component
public class MyMessageListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) {
    log.info("Message received.");
    // do something with the message
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

}

And for the rabbitConfiguration

@Configuration
public class RabbitConfig {

public static final String topicExchangeName = "exchange1";

public static final String queueName = "queue1";

public static final String routingKey = "queue1.route.#";

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("xxxx");
    connectionFactory.setPassword("xxxxxxxxxx");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("vHost1");
    return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() {
    return new RabbitTemplate(connectionFactory());
}

@Bean
Queue queue() {
    return new Queue(queueName, true);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}


@Bean
public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
    SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
    listenerContainer.setConnectionFactory(connectionFactory());
    listenerContainer.setQueueNames(queueName);
    listenerContainer.setMessageListener(myRabbitMessageListener);
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    listenerContainer.setConcurrency("4");
    listenerContainer.setPrefetchCount(20);
    return listenerContainer;
}

}



回答3:

Thanks for gary's help. I finally solved the issue. I am documenting this for the benefit of others. This needs to be documented as part of standard documentation in Spring AMQP reference documentation page. Service class is as below.

   @Service
    public class Consumer {

    @RabbitListener(queues = "${eventqueue}")
    public void receiveMessage(Order order, Channel channel) throws Exception {



 // the above methodname can be anything but should have channel as second signature

    channel.basicConsume(eventQueue, false, channel.getDefaultConsumer()); 
    // Get the delivery tag
    long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag();
    try {

      // code for processing order

    catch(Exception) {
     // handle exception
        channel.basicReject(deliveryTag, true);
    }
    // If all logic is successful 
    channel.basicAck(deliveryTag, false);
}

the configuration has also been modified as below

public class RabbitApplication implements RabbitListenerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class);

    public static void main(String[] args) {
        SpringApplication.run(RabbitApplication.class, args);
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Autowired
    private Consumer consumer;

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

      ...
}

Note: no need to configure Rabbitconnectionfactory or containerfactor etc since the annotation implicity take care of all this.