I am trying to set up a queue with delayed requeuing of failed messages following the pattern described here.
I tried copying the config example from the docs as closely as possible but the dead letter queue that was created was not itself bound back to the DLX. I am unclear as to why not.
I saw another potential solution though and instead of relying on the default behavior, I tried explicitly setting the dlqDeadLetterExchange and dlqDeadLetterRoutingKey properties to see if I could make that work. My config looks like this:
rabbit:
bindings:
input:
consumer:
autoBindDlq: true
bindingRoutingKey: my-routing-key
dlq-ttl: 5000
deadLetterExchange: dead-letter-exchange
deadLetterQueueName: my-queue-dl
dlqDeadLetterExchange: dead-letter-exchange
dlqDeadLetterRoutingKey: my-queue-dl
This very nearly works. All I have to do to complete the pattern is to manually add the binding to the DLX that routes messages with the 'my-queue-dl' routing key back to 'my-queue'. However, I have not found a way to do that with the documented spring-cloud-streams configuration properties. Is there a standard way to do that? Some sort of a 'dlqDeadLetterExchangeBinding' configuration?
See Retry With the RabbitMQ Binder.
Set autoBindDlq
to true - the binder will create a DLQ; you can optionally specify a name in deadLetterQueueName
Set dlqTtl
to the back off time you want to wait between redeliveries
Set the dlqDeadLetterExchange
to the default exchange - expired messages from the DLQ will be routed to the original queue since the default deadLetterRoutingKey is the queue name (destination.group)
The key phrase is "Set the dlqDeadLetterExchange to the default exchange" - the technique relies on the fact that every queue is bound to the default exchange with a routing key equal to the queue name.
I just tested it again and it works fine...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So48531769Application {
public static void main(String[] args) {
SpringApplication.run(So48531769Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) List<?> death) {
System.out.println(in + ":" + death);
throw new RuntimeException("failed");
}
}
with
spring:
cloud:
stream:
bindings:
input:
consumer:
max-attempts: '1'
group: foo
rabbit:
bindings:
input:
consumer:
autoBindDlq: 'true'
dlq-dead-letter-exchange:
dlq-ttl: '5000'
Notice that the dlq-dead-letter-exchange
property is specified with no value - which means route expired messages to the default exchange.
input.foo
input.foo.dlq