-->

How to write a MassTransit Json Deserializer for A

2020-07-17 15:41发布

问题:

here it is how I am publishing to an object to event grid. I want to be able to use the azure service bus to listen to it.

        public void Publicar<T>(T model, string operation, string entity)
    {
        _nomeEvento = entity + operation;

        Boolean.TryParse(Configuration["EventGridConfig:Enabled"], out var eventGridIsActive);
        if (!eventGridIsActive)
            return;

        var primaryTopicKey = Configuration["EventGridConfig:AcessKey"];
        var primaryTopic = Configuration["EventGridConfig:Endpoint"];

        var primaryTopicHostname = new Uri(primaryTopic).Host;

        var topicCredentials = new TopicCredentials(primaryTopicKey);
        var client = new EventGridClient(topicCredentials);

        client.PublishEventsAsync(primaryTopicHostname, GetEventsList(model)).GetAwaiter().GetResult();
    }

    private List<EventGridEvent> GetEventsList<T>(T model)
    {
        return new List<EventGridEvent>
        {
            new EventGridEvent()
            {
                Id = Guid.NewGuid().ToString(),
                EventType = _nomeEvento,
                Data = model,
                EventTime = DateTime.Now,
                Subject = "MS_Clientes",
                DataVersion = "1.0",
            }
        };
    }

here it is how i am connection to the service bus

    static class CustomExtensionsMethods
{
    public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
        IHostingEnvironment env)
    {
        services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
        services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            var keyName = "RootManageSharedAccessKey";
            var busName = configuration["ServiceBus:Name"];
            var secret = configuration["ServiceBus:Secret"];
            var host = cfg.Host(
                "Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
                "SharedAccessKeyName=" + keyName + ";" +
                "SharedAccessKey=" + secret,
                z =>
                {
                    TokenProvider
                        .CreateSharedAccessSignatureTokenProvider(keyName, secret);
                });
            cfg.ConfigureJsonSerializer(settings =>
            {
                settings.Converters.Add(new InterfaceConverter());

                return settings;
            });
            cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
            cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
                e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
        }));
        services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
        services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
        services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
        services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
        services.AddSingleton<IHostedService, BusService>();
        return services;
    }
}

but then I get the same error

    fail: MassTransit.Messages[0]
      R-FAULT sb://dev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
   at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

I have tried to add a JsonConverter I found online, but no luck

    public class InterfaceConverter : JsonConverter
    {
        public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
        {
            serializer.Serialize(writer, value);
        }

        public override object ReadJson(JsonReader reader, Type objectType, object existingValue,
            JsonSerializer serializer)
        {
            // Set TypeNameHandling to Auto for deserializing objects with $type
            // Should be set directly in ConfigureJsonDeserializer when setting up MT Service bus
            serializer.TypeNameHandling = TypeNameHandling.Auto;
            return serializer.Deserialize(reader);
        }

        public override bool CanConvert(Type objectType)
        {
            return objectType.IsInterface;
        }
    }

回答1:

I tried a couple of tests and came up with a working solution. In my test case, I am redirecting messages from the EventGridTopic to the ServiceBusQueue, just like in your case - if I understood well.

Since MassTransit requires messages to be in a certain format in order to interpret them, we need to make sure to have the following:

  1. Custom deserializer for Event grid messages which are the type of EventGridEvent
  2. Make sure that all the messages that have to be consumed by MassTransit have ContentType - without this, it won't work

Therefore I built an example that can work if you are redirecting messages from EventGrid, but also if you pipe messages directly to the Service Bus. The following code is an example of how to implement deserializer for EventGrid messages:

public class EventGridMessgeDeserializer : IMessageDeserializer
    {
        private string _contentType;

        public EventGridMessgeDeserializer(string contentType)
        {
            _contentType = contentType;
        }
        public ContentType ContentType => new ContentType(_contentType);

        public ConsumeContext Deserialize(ReceiveContext receiveContext)
        {
            var body = Encoding.UTF8.GetString(receiveContext.GetBody());
            var customMessage = JsonConvert.DeserializeObject<EventGridEvent>(body);
            var serviceBusSendContext = new AzureServiceBusSendContext<EventGridEvent>(customMessage, CancellationToken.None);

            // this is the default scheme, that has to match in order messages to be processed
            // EventGrid messages type of EventGridEvent within namespace Microsoft.Azure.EventGrid.Models
            string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" };
            var serviceBusContext = receiveContext as ServiceBusReceiveContext;
            serviceBusSendContext.ContentType = new ContentType(JsonMessageSerializer.JsonContentType.ToString());
            serviceBusSendContext.SourceAddress = serviceBusContext.InputAddress;
            serviceBusSendContext.SessionId = serviceBusContext.SessionId;

            // sending JToken because we are using default Newtonsoft deserializer/serializer
            var messageEnv = new JsonMessageEnvelope(serviceBusSendContext, JObject.Parse(body), messageTypes);
            return new JsonConsumeContext(JsonSerializer.CreateDefault(), receiveContext, messageEnv);
        }

        public void Probe(ProbeContext context)
        {
        }
    }

The important part here is that you specify in your custom deserializer what is the message type(s). Since MassTransit requires some format and ignores messages that do not comply, this is the place where we specify that piece of information as MassTransit requires.

string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" }

This is the default scheme, that has to match in order messages to be processed

And finally, full code you can find on Github: https://github.com/kgalic/MassTransitSample

Side note: If you are sending messages directly to the SB queue and want to deserialize them, as previously said, you need to specify the ContentType as it follows:

var message = new Message(UTF8Encoding.UTF8.GetBytes(request));
message.ContentType = "application/json"; //must have
await _senderClient.SendAsync(message);

In case you have something like this, you need to write the deserializer similar as for EventGridEvent, which you can use as an example.



回答2:

MassTransit encapsulates messages in a message envelope as documented and since the Event Grid messages aren't in that format, the error.

The JsonSerializer settings that you've set are used but the expected object on deserialize is a MessageEnvelope at this line.

I guess you have 2 ways to work around this

  1. Create and use a custom deserializer (similar to JsonMessageDeserializer) which deserializes into a simple JObject or Message

    UPDATE: On trying this further, its seems a lot more complex that I had originally thought and like Richard mentions in the other answer, you are probably better off using the Azure Service Bus client itself (or even a generic AMQP client if required)

  2. Trigger an Azure Function or Logic App to wrap the Event Grid payload into a Message Envelope for MassTransit to be able to deserialize it and then send it into the service bus queue



回答3:

I think the issue is that you are attempting to use MassTransit as a client library to consume generic json encoded messages, when in fact it is an opinionated application framework that defines and requires its own semi-private encoding schemes using a particular message envelope structure:

https://masstransittemp.readthedocs.io/en/latest/advanced/interop.html

To use Mass Transit, you must use the library in both the publishing and subscribing components, which would then use Azure Service Bus as the "Transport" for relaying the messages between them. The libraries will handle deserialization and will perform routing and exception handling using the data in the message envelope. As such, "vnd.masstransit" encoded messages are not intended to be produced or consumed by other frameworks such as EventGrid.

From having run selection processes for messaging libraries in the past, I would advise switching to a client library based on a generic protocol such as AMQP 1.0 instead (See https://github.com/Azure/amqpnetlite), and defining your own routing and exception patterns.