How to acknowledge the messages manually without using auto acknowledgement.
Is there a way to use this along with the @RabbitListener
and @EnableRabbit
style of configuration.
Most of the documentation tells us to use SimpleMessageListenerContainer
along with ChannelAwareMessageListener
.
However using that we lose the flexibility that is provided with the annotations.
I have configured my service as below :
@Service
public class EventReceiver {
@Autowired
private MessageSender messageSender;
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {
// code for processing order
}
My RabbitConfiguration is as below
@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter((MessageConverter) jackson2Converter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
return connectionFactory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(myRabbitListenerContainerFactory());
}
@Autowired
private EventReceiver receiver;
}
}
Any help will be appreciated on how to adapt manual channel acknowledgement along with the above style of configuration.
If we implement the ChannelAwareMessageListener then the onMessage signature will change.
Can we implement ChannelAwareMessageListener on a service ?
Add the Channel
to the @RabbitListener
method...
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
...
}
and use the tag in the basicAck
, basicReject
.
EDIT
@SpringBootApplication
@EnableRabbit
public class So38728668Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
context.close();
}
@Bean
public Queue so38728668() {
return new Queue("so38728668");
}
@Bean
public Listener listener() {
return new Listener();
}
public static class Listener {
private final CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "so38728668")
public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
System.out.println(payload);
channel.basicAck(tag, false);
latch.countDown();
}
}
}
application.properties:
spring.rabbitmq.listener.acknowledge-mode=manual
Just in case you need to use #onMessage() from ChannelAwareMessageListener class. Then you can do it this way.
@Component
public class MyMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
log.info("Message received.");
// do something with the message
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
And for the rabbitConfiguration
@Configuration
public class RabbitConfig {
public static final String topicExchangeName = "exchange1";
public static final String queueName = "queue1";
public static final String routingKey = "queue1.route.#";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("xxxx");
connectionFactory.setPassword("xxxxxxxxxx");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vHost1");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
@Bean
public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueueNames(queueName);
listenerContainer.setMessageListener(myRabbitMessageListener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setConcurrency("4");
listenerContainer.setPrefetchCount(20);
return listenerContainer;
}
}
Thanks for gary's help. I finally solved the issue. I am documenting this for the benefit of others.
This needs to be documented as part of standard documentation in Spring AMQP reference documentation page.
Service class is as below.
@Service
public class Consumer {
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel) throws Exception {
// the above methodname can be anything but should have channel as second signature
channel.basicConsume(eventQueue, false, channel.getDefaultConsumer());
// Get the delivery tag
long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag();
try {
// code for processing order
catch(Exception) {
// handle exception
channel.basicReject(deliveryTag, true);
}
// If all logic is successful
channel.basicAck(deliveryTag, false);
}
the configuration has also been modified as below
public class RabbitApplication implements RabbitListenerConfigurer {
private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class);
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(jackson2Converter());
return factory;
}
@Autowired
private Consumer consumer;
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
...
}
Note: no need to configure Rabbitconnectionfactory or containerfactor etc since the annotation implicity take care of all this.