Delayed requeuing through Rabbit DLQ

2019-08-24 06:58发布

问题:

I am trying to set up a queue with delayed requeuing of failed messages following the pattern described here.

I tried copying the config example from the docs as closely as possible but the dead letter queue that was created was not itself bound back to the DLX. I am unclear as to why not.

I saw another potential solution though and instead of relying on the default behavior, I tried explicitly setting the dlqDeadLetterExchange and dlqDeadLetterRoutingKey properties to see if I could make that work. My config looks like this:

rabbit:
  bindings:
    input:
      consumer:
        autoBindDlq: true
        bindingRoutingKey: my-routing-key
        dlq-ttl: 5000
        deadLetterExchange: dead-letter-exchange
        deadLetterQueueName: my-queue-dl
        dlqDeadLetterExchange: dead-letter-exchange
        dlqDeadLetterRoutingKey: my-queue-dl

This very nearly works. All I have to do to complete the pattern is to manually add the binding to the DLX that routes messages with the 'my-queue-dl' routing key back to 'my-queue'. However, I have not found a way to do that with the documented spring-cloud-streams configuration properties. Is there a standard way to do that? Some sort of a 'dlqDeadLetterExchangeBinding' configuration?

回答1:

See Retry With the RabbitMQ Binder.

Set autoBindDlq to true - the binder will create a DLQ; you can optionally specify a name in deadLetterQueueName

Set dlqTtl to the back off time you want to wait between redeliveries

Set the dlqDeadLetterExchange to the default exchange - expired messages from the DLQ will be routed to the original queue since the default deadLetterRoutingKey is the queue name (destination.group)

The key phrase is "Set the dlqDeadLetterExchange to the default exchange" - the technique relies on the fact that every queue is bound to the default exchange with a routing key equal to the queue name.

I just tested it again and it works fine...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So48531769Application {

    public static void main(String[] args) {
        SpringApplication.run(So48531769Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) List<?> death) {
        System.out.println(in + ":" + death);
        throw new RuntimeException("failed");
    }

}

with

spring:
  cloud:
    stream:
      bindings:
        input:
          consumer:
            max-attempts: '1'
          group: foo
      rabbit:
        bindings:
          input:
            consumer:
              autoBindDlq: 'true'
              dlq-dead-letter-exchange:
              dlq-ttl: '5000'

Notice that the dlq-dead-letter-exchange property is specified with no value - which means route expired messages to the default exchange.

input.foo

input.foo.dlq