Adding values to header in MassTransit.RabbitMq

2019-06-22 10:53发布

问题:

I am using MassTransit 3.0.0.0 and I have a hard time understanding how to intercept messages in a Request-Response scenario on their way out and add some information to the headers field that I can read on the receiver's end.

I was looking at the Middleware, as recommended in the MassTransit docs - see Observers warning - but the context you get on the Send is just a Pipe context that doesn't have access to the Headers field so I cannot alter it. I used the sample provided in Middleware page.

I then, looked at IPublishInterceptor

public class X<T> : IPublishInterceptor<T> where T : class, PipeContext
{
    public Task PostPublish(PublishContext<T> context)
    {
        return new Task(() => { });
    }

    public Task PostSend(PublishContext<T> context, SendContext<T> sendContext)
    {
        return new Task(() => { });
    }

    public Task PrePublish(PublishContext<T> context)
    {
        context.Headers.Set("ID", Guid.NewGuid().ToString());
        return new Task(() => { });
    }

    public Task PreSend(PublishContext<T> context, SendContext<T> sendContext)
    {
        context.Headers.Set("ID", Guid.NewGuid().ToString());
        return new Task(() => { });
    }
}

Which is very clear and concise. However, I don't know where it is used and how to link it to the rest of the infrastructure. As it stands, this is just an interface that is not really linked to anything.

回答1:

If you need to add headers when a message is being sent, you can add middleware components to either the Send or the Publish pipeline as shown below. Note that Send filters will apply to all messages, whereas Publish filters will only apply to messages which are published.

// execute a synchronous delegate on send
cfg.ConfigureSend(x => x.Execute(context => {}));

// execute a synchronous delegate on publish
cfg.ConfigurePublish(x => x.Execute(context => {}));

The middleware can be configured on either the bus or individual receive endpoints, and those configurations are local to where it's configured.



回答2:

You can also add headers in the consumer class:

public async Task Consume(ConsumeContext<MyMessage> context)
{
    ....
    await context.Publish<MyEvent>(new { Data = data }, c => AddHeaders(c));
}

public static void AddHeaders(PublishContext context)
{
    context.Headers.Set("CausationId", context.MessageId);
}


回答3:

http://masstransit-project.com/MassTransit/advanced/middleware/custom.html

Shows adding an extension method to make it clear what you're setup. That's a big help if it's an interceptor that will be used a lot, so it's clear that purpose. You can skip that step if you want.

Basically, just...

cfg.AddPipeSpecification(new X<MyMessage>());

When configuring the transport.