MassTransit 2.6.1 Request/Response pattern - Respo

2019-04-10 06:10发布

问题:

I'm looking at MassTransit as a ServiceBus implementation to use in a web project.

I am playing with the Request/Response pattern and am seeing a long delay between the consumer receiving the message and responding, and the request publisher handling the response; sometimes, it seems like the response is never going to come through (having left it running for 10 minutes, the response has still not come through). the only times that I have seen the handle delegate get called with the response is after a 30 second timeout period and the timeout exception being thrown; in this situation, the breakpoint set on the handler delegate is hit.

The setup is a standard affair - I have a web app that is publishing requests, a console app that is consuming requests and sending responses, for the web app to handle the responses in the callback.

I'm using Castle Windsor, and the container is initialized in the web project using WebActivator:

[assembly: WebActivator.PreApplicationStartMethod(typeof(BootStrapper), "PreStart")]
[assembly: WebActivator.PostApplicationStartMethod(typeof(BootStrapper), "PostStart")]
[assembly: WebActivator.ApplicationShutdownMethodAttribute(typeof(BootStrapper), "Stop")]

namespace Web.App_Start
{
    public static class BootStrapper
    {
        internal static IWindsorContainer Container { get; private set; }

        public static void PreStart()
        {
            Container = new WindsorContainer().Install(FromAssembly.This());
        }

        public static void PostStart()
        {
            FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
            RouteConfig.RegisterRoutes(RouteTable.Routes);
            BundleConfig.RegisterBundles(BundleTable.Bundles);

            ApiConfig.Configure(Container);
            MvcConfig.Configure(Container);
        }

        public static void Stop()
        {
            if (Container != null)
                Container.Dispose();
        }
    }
}

In the web app project (an ASP.NET Web API project), the WindsorInstaller for MassTransit looks like

public class MassTransitInstaller : IWindsorInstaller
{
    public void Install(IWindsorContainer container, IConfigurationStore store)
    {
        container.Register(AllTypes.FromThisAssembly().BasedOn<IConsumer>());

        var bus = ServiceBusFactory.New(configurator =>
        {
            configurator.UseMsmq();
            configurator.VerifyMsmqConfiguration();            
            configurator.UseMulticastSubscriptionClient();

            configurator.ReceiveFrom("msmq://localhost/web");

            configurator.EnableMessageTracing();
            configurator.Subscribe(x => x.LoadFrom(container));
        });

        container.Register(Component.For<IServiceBus>().Instance(bus));
    }
}

In the console app project, the WindsorInstaller looks like

public class MassTransitInstaller : IWindsorInstaller
{
    public void Install(IWindsorContainer container, IConfigurationStore store)
    {
        container.Register(AllTypes.FromAssemblyContaining<BasicRequestCommandHandler>().BasedOn<IConsumer>());

        var bus = ServiceBusFactory.New(configurator =>
        {
            configurator.UseMsmq();
            configurator.VerifyMsmqConfiguration();
            configurator.UseMulticastSubscriptionClient();

            configurator.ReceiveFrom("msmq://localhost/console");

            configurator.Subscribe(x => x.LoadFrom(container));
        });

        container.Register(Component.For<IServiceBus>().Instance(bus));
    }
}

I have an ApiController with the following GET action method

public class ExampleController : ApiController
{
    private readonly IServiceBus _bus;

    public HelloController(IServiceBus bus)
    {
        _bus = bus;
    }

    // GET api/hello?text={some text}
    public Task<IBasicResponseCommand> Get(string text)
    {
        var command = new BasicRequestCommand {Text = text};

        var tcs = new TaskCompletionSource<IBasicResponseCommand>();

        _bus.PublishRequest(command, c =>
        {
            c.Handle<IBasicResponseCommand>(r =>
            {
                tcs.SetResult(r);
            });
        });

        return tcs.Task;
    }
}

BasicRequestCommand and BasicResponseCommand look like so

public interface IBasicRequestCommand
{
    Guid CorrelationId { get; set; }
    string Text { get; set; }
}

public class BasicRequestCommand :
    CorrelatedBy<Guid>, IBasicRequestCommand
{
    public Guid CorrelationId { get; set; }
    public string Text { get; set; }

    public BasicRequestCommand()
    {
        CorrelationId = Guid.NewGuid();
    }
}

public interface IBasicResponseCommand
{
    Guid CorrelationId { get; set; }
    string Text { get; set; }
}

public class BasicResponseCommand :
    CorrelatedBy<Guid>, IBasicResponseCommand
{
    public Guid CorrelationId { get; set; }
    public string Text { get; set; }
}

And the handler responding to the BasicRequestCommand in the console app:

public class BasicRequestCommandHandler : Consumes<IBasicRequestCommand>.Context
{
    public void Consume(IConsumeContext<IBasicRequestCommand> context)
    {
        Console.Out.WriteLine("received message text " + context.Message.Text);

        context.Respond(new BasicResponseCommand { Text = "Hello " + context.Message.Text, CorrelationId = context.Message.CorrelationId });
    }
}

I was anticipating with all of this running locally that the request/response would be in the order of a few seconds at most. Am I missing something in configuration?

In addition, I wanted to hook MassTransit up to log4net. I am using Windsor's log4net logging facility and have a log4net section in web.config. This is all working fine for ILogger implementations provided by Windsor (and also for NHibernate logging), but it's not clear from the documentation how to configure MassTransit to use this for logging. Any ideas?

回答1:

Just as Andrei Volkov and Chris Patterson were discussing on MassTransit google group, it seems that this issue stems from switching MassTransit to using SynchronizationContext, which for some reason does not work as expected.

For the time being one workaround seems to be transitioning to async MassTransit requests, or going back to v2.1.1 that does not use the offending SynchronizationContext.

(Will posts updates on this issue here for posterity if noone else does that first.)



回答2:

The response timeout issue for Request/Response in ASP.NET is fixed in version 2.6.2. https://groups.google.com/d/topic/masstransit-discuss/oC1FOe6KsAU/discussion



回答3:

As you're using the MultiCastSubscriptionClient, you must call SetNetwork(NETWORK_KEY) on each machine (using the same value for NETWORK_KEY). Also, all participating machines need to be on the same subnet - see the documentation at http://masstransit.readthedocs.org/en/latest/overview/subscriptions.html#msmq-multicast

For hooking up log4net, it depends what version you're using, but in the latest versions you include the MassTransit.Log4NetIntegration assembly and then call cfg.UseLog4Net(); in your service bus configuration.

If you're still stuck, you could ask the MT mailing list at https://groups.google.com/forum/?fromgroups#!forum/masstransit-discuss