MassTransit: Message contracts, polymorphism and d

2019-09-11 14:49发布

TL;DR On contract subscription, how can I get the raw message content or the original published object, rather than a dynamic proxy?

I am having a bad time trying to create a modular application based on MassTransit.

My idea is to have a Websocket server connected to queue, it reads events from the socket and inserts it in the queue as a "connection request", and reads events from the queue and send them to the sockets as "connection events". Both have a contract that allows the WS server know to which connection is the event going, and the rest of the system where is coming from:

public interface IConnectionRequest
{
    String ConnectionId { get; set; }
}

public interface IConnectionEvent
{
    String ConnectionId { get; set; }
}

There a object that maintain session and other data. This object accepts requests (like action requests or subscription requests) and push events as a result of the requests or just because the state changed. I want to create objects that listen for a particular event or set of events, and perform actions on the state, so I created this contract:

public interface IConnectionRequestHandler<T> : Consumes<T>.Selected
    where T : class, IConnectionRequest
{
}

For example, I want to create a handler that creates the actual session in the server, and replies to the connection notifying when the session is ready. I create an object that represents the request, other for the event and the handler it self.

public class CreateSessionRequest : IConnectionRequest
{
    public String ConnectionId { get; set; }
}

public class CreatedSessionEvent : IConnectionEvent
{
    public String ConnectionId { get; set; }
    public Guid SessionId { get; set; }
}

public class CreateSessionEventHandler : IConnectionRequestHandler<CreateSessionRequest>
{
    IServiceBus _bus;

    public CreateSessionEventHandler(IServiceBus bus)
    {
        _bus = bus;
    }

    public bool Accept(CreateSessionRequest message)
    {
        return true;
    }

    public void Consume(CreateSessionRequest message)
    {
        // do stuff, create the session
        var evt = new CreatedSessionEvent() { SessionId =Guid.NewGuid(), ConnectionId = message.ConnectionId };
        _bus.Publish(evt, evt.GetType());
    }
}

Now for testing purposes, I create this code that emulates the scenario. Basically, it creates a communication bus and subscribes the request handler:

var bus = ServiceBusFactory.New(sbc =>
{
    sbc.ReceiveFrom("loopback://localhost/queue");
});

bus.SubscribeInstance<CreateSessionEventHandler>(new CreateSessionEventHandler(bus));

Then, simulating the Websocket server, I write the part that reads from the WS and sends it to the queue:

IConnectionRequest e = new CreateSessionRequest() { ConnectionId = "myId" };
bus.Publish(e, e.GetType());

And now the part that is supposed to hear events from the queue and forward them to the appropiate connection:

bus.SubscribeHandler<IConnectionEvent>(evt => Console.WriteLine("Sending event '{0}' to connection: {1}",
                                                                evt.GetType().Name,  
                                                                evt.ConnectionId));

But this last part does not work as expected. The object I get in the subscription is not my original event, it is a dynamic proxy DynamicImpl.IConnectionEvent, so I cannot serialize this object in JSON since it would only contain the members of IConnectionEvent.

If I specify the type in the subscription, it works:

bus.SubscribeHandler<CreatedSessionEvent>(evt => Console.WriteLine("Sending event '{0}' to connection: {1}",
                                                                evt.GetType().FullName,  
                                                                evt.ConnectionId));

But then means that for each new event, I have to touch the websocket server to register that new type.

Is there a way of avoiding this?

1条回答
Bombasti
2楼-- · 2019-09-11 15:15

TL;DR - MT only supports data on the contract for most serializes. There's a bunch of reasons for this but if you want non-proxy type then you need to use the binary serializer.

So XML & JSON serialization (XML actually just uses the JSON serializer under the hood, oddly faster) generates a proxy for all requests if it can. The subscribed type is the contract you're using and expecting to interrogate the object for more details not in the contract leads to a ton of complexity. We suggest you avoid doing that.

So that would mean you would need to go to the websocket server for each type of message it was expected to consume. If the only thing you are doing is forwarding the message off, there's a SingalR backplane someone did (https://github.com/mdevilliers/SignalR.RabbitMq I think is one) that might be better fit than MT consumers.

查看更多
登录 后发表回答