TL;DR On contract subscription, how can I get the raw message content or the original published object, rather than a dynamic proxy?
I am having a bad time trying to create a modular application based on MassTransit.
My idea is to have a Websocket server connected to queue, it reads events from the socket and inserts it in the queue as a "connection request", and reads events from the queue and send them to the sockets as "connection events". Both have a contract that allows the WS server know to which connection is the event going, and the rest of the system where is coming from:
public interface IConnectionRequest
{
String ConnectionId { get; set; }
}
public interface IConnectionEvent
{
String ConnectionId { get; set; }
}
There a object that maintain session and other data. This object accepts requests (like action requests or subscription requests) and push events as a result of the requests or just because the state changed. I want to create objects that listen for a particular event or set of events, and perform actions on the state, so I created this contract:
public interface IConnectionRequestHandler<T> : Consumes<T>.Selected
where T : class, IConnectionRequest
{
}
For example, I want to create a handler that creates the actual session in the server, and replies to the connection notifying when the session is ready. I create an object that represents the request, other for the event and the handler it self.
public class CreateSessionRequest : IConnectionRequest
{
public String ConnectionId { get; set; }
}
public class CreatedSessionEvent : IConnectionEvent
{
public String ConnectionId { get; set; }
public Guid SessionId { get; set; }
}
public class CreateSessionEventHandler : IConnectionRequestHandler<CreateSessionRequest>
{
IServiceBus _bus;
public CreateSessionEventHandler(IServiceBus bus)
{
_bus = bus;
}
public bool Accept(CreateSessionRequest message)
{
return true;
}
public void Consume(CreateSessionRequest message)
{
// do stuff, create the session
var evt = new CreatedSessionEvent() { SessionId =Guid.NewGuid(), ConnectionId = message.ConnectionId };
_bus.Publish(evt, evt.GetType());
}
}
Now for testing purposes, I create this code that emulates the scenario. Basically, it creates a communication bus and subscribes the request handler:
var bus = ServiceBusFactory.New(sbc =>
{
sbc.ReceiveFrom("loopback://localhost/queue");
});
bus.SubscribeInstance<CreateSessionEventHandler>(new CreateSessionEventHandler(bus));
Then, simulating the Websocket server, I write the part that reads from the WS and sends it to the queue:
IConnectionRequest e = new CreateSessionRequest() { ConnectionId = "myId" };
bus.Publish(e, e.GetType());
And now the part that is supposed to hear events from the queue and forward them to the appropiate connection:
bus.SubscribeHandler<IConnectionEvent>(evt => Console.WriteLine("Sending event '{0}' to connection: {1}",
evt.GetType().Name,
evt.ConnectionId));
But this last part does not work as expected. The object I get in the subscription is not my original event, it is a dynamic proxy DynamicImpl.IConnectionEvent
, so I cannot serialize this object in JSON since it would only contain the members of IConnectionEvent
.
If I specify the type in the subscription, it works:
bus.SubscribeHandler<CreatedSessionEvent>(evt => Console.WriteLine("Sending event '{0}' to connection: {1}",
evt.GetType().FullName,
evt.ConnectionId));
But then means that for each new event, I have to touch the websocket server to register that new type.
Is there a way of avoiding this?