Multiple consumers for the same message through Un

2020-03-03 04:25发布

I'm having a lot of problems lately because of what seems to be a bug in the MassTransit.UnityIntegration package, primarily due to the fact that registration names are not being considered.

For instance, if I register my classes like this:

var container = new UnityContainer()
    .RegisterType<Consumes<Command1>.All, Handler1>("Handler1")
    .RegisterType<Consumes<Command1>.All, Handler3>("Handler3");

A few lines later, I use the LoadFrom extension method to get the registered consumers in the container like this:

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(_s => _s.LoadFrom(container));
    });

What happens is that my handlers are never called when the associated messages hit the bus.

After pondering for a while, I decided to take a look at the implementation and it became clear why this happens:

This is the main code inside the LoadFrom method:

public static void LoadFrom(this SubscriptionBusServiceConfigurator configurator, IUnityContainer container)
{
    IList<Type> concreteTypes = FindTypes<IConsumer>(container, x => !x.Implements<ISaga>());
    if (concreteTypes.Count > 0)
    {
        var consumerConfigurator = new UnityConsumerFactoryConfigurator(configurator, container);

        foreach (Type concreteType in concreteTypes)
            consumerConfigurator.ConfigureConsumer(concreteType);
    }

    ...

}

Notice that it only finds the types and does not pass any information of the names forward. This is the FindTypes<T> implementation:

static IList<Type> FindTypes<T>(IUnityContainer container, Func<Type, bool> filter)
{
    return container.Registrations
                        .Where(r => r.MappedToType.Implements<T>())
                        .Select(r => r.MappedToType)
                        .Where(filter)
                        .ToList();
}

After a few indirections, it all comes down to this single line, inside the UnityConsumerFactory<T> class, that actually creates the instance of the consumer:

var consumer = childContainer.Resolve<T>();

This absolutely will not work with Unity when there are multiple registrations, because the only way to register (and then resolve) multiple implementations in Unity is to give them a name on the RegisterType call and later on specifying this name on the Resolve call.

Perhaps I'm missing something completely basic in all this and the error is on my part? The source for the MassTransit Unity components can be found here. I did not look into the code for the other containers because I'm not familiar with them, but I assume this has been handled in some way? I think having more than one consumer for the same message type inside the same container is actually quite common.

In this particular case, it would be better to pass along not only the Type from the registration in the container, but also the Name used for the registration.

Update

Well the problem is a bit more clear now that Travis took the time to explain it. I should have noticed it earlier.

It seems I should be registering the types directly for they to be correctly resolved inside the factory, like this:

var container = new UnityContainer()
    .RegisterType<Handler1>()
    .RegisterType<Handler3>();

With this approach, I can also omit the registration name, since now their build keys inside the container are different.

Well, this would work perfectly if this was our real scenario, but it isn't. Let me explain what exactly we are doing:

Before we started using MassTransit, we already had an interface used for the command pattern, called ICommandHandler<TCommand>, where TCommand is a base model for commands in the system. When we started considering the use of a service bus, it was clear from the get go that it should be possible to switch later on to another service bus implementation without much hassle. With that in mind, I proceeded to create an abstraction over our commanding interface to behave like one of the consumers that MT expects. This is what I came up with:

public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All
    where T : class, ICommand
{
    private readonly ICommandHandler<T> _commandHandler;

    public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
    {
        _commandHandler = commandHandler;
    }

    public void Consume(T _message)
    {
        _commandHandler.Handle(_message);
    }
}

It's a very simple adapter class. It receives an ICommandHandler<T> implementation and makes it behave like a Consumes<T>.All instance. It was unfortunate that MT required message models to be classes, since we did not have that constraint on our commands, but that was a small inconvenience, and we proceeded to add the where T : class constraint to our interfaces.

Then, since our handler interfaces were already registered in the container, it would be a matter of registering the MT interface with this adapter implementation and letting the container inject the real implementations over it. For instance, a more realistic example (taken straight from our code base):

.RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
.RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor")
.RegisterType<Consumes<ApplicationInstallationResult>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationResult>>()
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Recorder", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Recorder")))
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Executor", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Executor")))

The named registrations there are a bit convoluted but required, since we now have two consumers for the same message. Although not as clean as we hoped, we could live with that since this promotes a huge decoupling of our code from MassTransit specific logic: the adapter class is in a separate assembly, referenced ONLY by the final layer in the system, for container registration purposes. That seems like a very nice idea, but is confirmed unsupported now by the lookup logic behind the container integration classes.

Notice that I'm unable to register the concrete classes here, since there is a generic adapter class in the middle.

Update 2:

After following Travis' advice, I tried this simple code which also does not work (I fail to see why, as it seems perfectly valid). It's an explicit consumer factory registration without any automatic container integration:

_sbc.Consume(() => container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))

That resolve call correctly gives me the previously registered CommandHandlerToConsumerAdapter<ApplicationInstallationCommand> instance, which implements the Consumes<ApplicationInstallationCommand>.All, which in turn should be one of THE base interfaces supported. Publishing an ApplicationInstallationCommand right after this does nothing, as if the handler was invalid or something similar.

This works though:

_sbc.Consume(() => (CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>) container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))

It is clear that something deep down in the API is handling the compile type in a non generic way of sorts, instead of basing itself on the generic interface.

I mean... it is workable with this but the registration code is getting convoluted for no apparent reason (due to what I would consider as 'non-standard implementation details' on MT's part). Maybe I'm just grasping at straws here? Perhaps this all boils down to 'why does MT not accept it's own, already generic, interface?' Why does it need the concrete type at compile time to see that it is a message handler even though the instance that I'm passing to it is typed as Consumes<X>.All, also at compile time?

Update 3:

After discussing with Travis below, I decided to drop the UnityIntegration assembly completely and go with standalone Consumer calls on the subscription.

I've created a small extension class in our MassTransit specific assembly to facilitate things:

public static class CommandHandlerEx
{
    public static CommandHandlerToConsumerAdapter<T> ToConsumer<T>(this ICommandHandler<T> _handler)
        where T : class, ICommand
    {
        return new CommandHandlerToConsumerAdapter<T>(_handler);
    }
}

And finally registered the handlers like this:

var container = new UnityContainer()
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor");

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(RegisterConsumers);
    });

private void RegisterConsumers(SubscriptionBusServiceConfigurator _s)
{
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Recorder").ToConsumer());
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Executor").ToConsumer());
}

After using the whole day yesterday to try working things out, I strongly suggest that you stay away from the container extension assemblies if you want expected behavior out of the container and/or if you want to customize the classes etc (like I did to decouple our messaging classes from MT specific code) for 2 main reasons:

  1. The logic in the extensions traverse the registrations in the container to find the consumer classes. This is, in my opinion, terrible design. If something wants an implementation from the container, it should just call Resolve or ResolveAll on it's interface (or their equivalent in non Unity terms), without caring for what exactly is registered and what their concrete types are. This can have serious consequences with code that assumes the container can return types that were not explicitly registered. Luckily it's not the case with these classes, but we do have a container extension that automatically creates decorator types based on the build key, and they don't need to be explicitly registered on the container.

  2. The consumer registration uses the MappedToType property on the ContainerRegistration instance to call Resolve on the container. This is just completely wrong on any situation, not just in MassTransit's context. Types in Unity are either registered as a mapping (like in the excerpts above, with a From and To component) or directly as a single concrete type. In BOTH cases the logic should use the RegisteredType type to resolve from the container. The way it works now is that, if you happen to register the handlers with their interfaces, MT will completely bypass your registration logic and call resolve on the concrete type instead, which works in Unity out of the box, possibly causing unpredictable behavior because you think it should be a singleton like you registered but it ends up being a transient object (the default) instead, for instance.

Looking back at it now I can see it was much more complicated that I originally believed. There was quite a bit of learning in the process too, so that's good.

Update 4:

Yesterday I decided to refactor the whole adapter approach a bit before making the final checkin. I went with MassTransit's interface pattern to create my adapters too, because I think that is a very nice and clean syntax.

Here is the result:

public sealed class CommandHandlerToConsumerAdapter<T>
    where T : class, ICommand
{
    public sealed class All : Consumes<T>.All
    {
        private readonly ICommandHandler<T> m_commandHandler;

        public All(ICommandHandler<T> _commandHandler)
        {
            m_commandHandler = _commandHandler;
        }

        public void Consume(T _message)
        {
            m_commandHandler.Handle(_message);
        }
    }
}

Unfortunatelly this breaks MassTransit's code because of an unhandled exception on a utility method in the referenced Magnum library, on an extension method called ToShortTypeName.

Here is the exception:

ArgumentOutOfRangeException in MassTransit receive

at System.String.Substring(Int32 startIndex, Int32 length)
at Magnum.Extensions.ExtensionsToType.ToShortTypeName(Type type)
at MassTransit.Pipeline.Sinks.ConsumerMessageSink2.<>c__DisplayClass1.<Selector>b__0(IConsumeContext1 context) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\ConsumerMessageSink.cs:line 51 at MassTransit.Pipeline.Sinks.InboundConvertMessageSink`1.<>c__DisplayClass2.<>c__DisplayClass4.b__1(IConsumeContext x) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\InboundConvertMessageSink.cs:line 45 at MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(IReceiveContext context) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Context\ServiceBusReceiveContext.cs:line 162

1条回答
我命由我不由天
2楼-- · 2020-03-03 05:21

While I don't know the Unity integration, with all the containers, you must register your consumers as the concrete type in the container and not the Consumes<> interfaces. I assume it's just RegisterType<Handler1, Handler1>() but I'm not totally sure on that.

If you don't like the LoadFrom extension for your container, you don't need to use it anyways. You can always just resolve the consumers yourself and register them via _sbc.Consume(() => container.resolve<YourConsumerType>()) in the configuration instead. The LoadFrom extension is just a convince for people who are using the container in a common way.

The following code works, which is using the container the way I would expect, without knowing your domain more, one to use it. If you want to understand how messages are bound a little bit better, I'd suggest using RabbitMQ because you can easily see where things end up by fallowing the exchange bindings. At this point, this is well beyond a SO question, I'd bring this to the mailing list if you have anything further.

using System;
using MassTransit;
using Microsoft.Practices.Unity;

namespace MT_Unity
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var container = new UnityContainer()
                .RegisterType<ICommandHandler<MyCommand>, MyCommandHandler>()
                .RegisterType<CommandHandlerToConsumerAdapter<MyCommand>>())

            using (IServiceBus consumerBus = ServiceBusFactory.New(sbc =>
                    {
                        sbc.ReceiveFrom("rabbitmq://localhost/consumer");
                        sbc.UseRabbitMq();


                        sbc.Subscribe(s => s.Consumer(() => container.Resolve<CommandHandlerToConsumerAdapter<MyCommand>>()));
                    }))
            using (IServiceBus publisherBus = ServiceBusFactory.New(sbc =>
                    {
                        sbc.ReceiveFrom("rabbitmq://localhost/publisher");
                        sbc.UseRabbitMq();
                    }))
            {
                publisherBus.Publish(new MyCommand());

                Console.ReadKey();
            }
        }
    }

    public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All where T : class, ICommand
    {
        private readonly ICommandHandler<T> _commandHandler;

        public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
        {
            _commandHandler = commandHandler;
        }

        public void Consume(T message)
        {
            _commandHandler.Handle(message);
        }
    }

    public interface ICommand { }
    public class MyCommand : ICommand { }

    public interface ICommandHandler<T> where T : class, ICommand
    {
        void Handle(T message);
    }

    public class MyCommandHandler : ICommandHandler<MyCommand>
    {
        public MyCommandHandler()
        {

        }
        public void Handle(MyCommand message)
        {
            Console.WriteLine("Handled MyCommand");
        }
    }

}
查看更多
登录 后发表回答