Derived types are not published to consumers in Ma

2019-05-28 12:01发布

I am having issues publishing generic messages for derived types and having the handler invoked using MassTransit v2.8.0.

If I publish a message of type HtmlBlockNewMessage, the consumer is never invoked. If I publish a ServiceBusMessage object and change the consumer to Consumes<ServiceBusMessage>.Context, the consumer is invoked.

The code fails for a derived type. It only works for the parent type (ServiceBusMessage).

Types:

[Serializable]
public class ServiceBusMessage
{
}

[Serializable]
public class ServiceBusResponse
{
    public int ResultCode { get; set; } 
}

// Request
[Serializable]
public class ContentItemMessage : ServiceBusMessage
{
    public string SiteId { get; set; }
    public string PageId { get; set; }
    public string ContentItemId { get; set; }
}

[Serializable]
public class HtmlBlockNewMessage : ContentItemMessage
{
    public string HtmlData { get; set; }
}

// Response
[Serializable]
public class ContentItemMessageResponse : ServiceBusResponse
{
    public string Name { get; set; }
    public string ItemType { get; set; }
    public string ItemHandler { get; set; }       
}

[Serializable]
public class HtmlBlockNewMessageResponse : ContentItemMessageResponse
{
    public string DataId { get; set; }
}

Consumer:

public class HtmlBlockConsumer : Consumes<HtmlBlockNewMessage>.Context
{
    private IHtmlDataService htmlDataService;
    public static ILogger Logger { get; set; }

    public HtmlBlockConsumer()
        : this(null)
    {
    }

    public HtmlBlockConsumer(IHtmlDataService htmlDataService)
    {
        Logger = Log4NetLogger.GetLogger();

        this.htmlDataService = htmlDataService ?? IoC.Container.Resolve<IHtmlDataService>();
    }

    public void Consume(IConsumeContext<HtmlBlockNewMessage> message)
    {
    // Do some stuff

        message.Respond(new HtmlBlockNewMessageResponse() { ResultCode = 1 } );
    }

}

Bus registration from publisher side:

        var bus = ServiceBusFactory.New(sbc =>
        {
            sbc.EnableMessageTracing();

            sbc.UseMsmq();
            sbc.VerifyMsmqConfiguration();
            sbc.UseMulticastSubscriptionClient();
            sbc.SetNetwork("Test");
            sbc.UseXmlSerializer();
            sbc.UseControlBus();

            sbc.ReceiveFrom("msmq://localhost/AuctionCMS.Web.Publisher");

            MtServiceBus.ValidateBus(sbc);
        });

        IoC.Container.RegisterInstance(bus);

Bus registration from consumer side:

        var bus = ServiceBusFactory.New(sbc =>
        {
            sbc.EnableMessageTracing();

            sbc.UseMsmq();
            sbc.VerifyMsmqConfiguration();
            sbc.UseMulticastSubscriptionClient();
            sbc.SetNetwork("Test");
            sbc.UseXmlSerializer();
            sbc.UseControlBus();

            sbc.ReceiveFrom("msmq://localhost/AuctionCMS.Consumer");

            sbc.Subscribe(subs =>
            {
                // These are being manually registered due to some issues getting
                // StructureMap to scan my assemblies
                subs.Instance(new HtmlBlockConsumer());
                subs.Instance(new BrowserConsumer());
                subs.Instance(new OfferConsumer());
            });
        });

        IoC.Container.RegisterInstance(bus);

My publish extension:

    public static TR Publish<T, TR>(this IServiceBus bus, T message) where T : ServiceBusMessage where TR : ServiceBusResponse
    {
        TR response = null;

        IoC.Container.Resolve<IServiceBus>().PublishRequest(message, callback =>
            {
                callback.SetTimeout(10.Seconds());

                try
                {
                    callback.Handle<TR>(m =>
                        {
                            response = m; /
                        });
                }
                catch (Exception ex)
                {

                    throw;
                }
            });

        return response;
    }

Calling code:

// First I create a message specific to the type of action I am performing 

var message = new HtmlBlockNewMessage() { HtmlData = "Hello" }; 

// Then I call a function which accepts a ContentItemMessage and calls Publish

public void AddContentItem(ContentItemMessage message) 
{
    // Do some preprocessing

    // This call times out
    var response = this.auctionCmsServices.Bus.Publish<ContentItemMessage,
          ContentItemMessageResponse>(message);

    // Do some more processing

}

This is the exception

[RequestTimeoutException: Timeout waiting for response, RequestId: 54910000-307f-20cf-c0c2-08d06b31cf6f]
   MassTransit.RequestResponse.RequestImpl`1.Wait() in d:\BuildAgent-03\work\aa063b4295dfc097\src\MassTransit\RequestResponse\RequestImpl.cs:124
   MassTransit.RequestResponseExtensions.PublishRequest(IServiceBus bus, TRequest message, Action`1 configureCallback) in d:\BuildAgent-03\work\aa063b4295dfc097\src\MassTransit\RequestResponseExtensions.cs:31
   AuctionCMS.Framework.ServiceBus.MtServiceBus.Publish(IServiceBus bus, T message) in c:\Users\rick\Documents\Visual Studio 2012\Projects\AuctionCMS\AuctionCMS.Framework\ServiceBus\MtServiceBus.cs:24
   AuctionCMS.Framework.Entity.Page.AddContentItem(ISite site, String zone, Int32 location, ContentItemMessage message) in c:\Users\rick\Documents\Visual Studio 2012\Projects\AuctionCMS\AuctionCMS.Framework\Entity\Page.cs:48
   AuctionCMS.Framework.Entity.Site.SetDefaultContent() in c:\Users\rick\Documents\Visual Studio 2012\Projects\AuctionCMS\AuctionCMS.Framework\Entity\Site.cs:117
   AuctionCMS.Web.Controllers.AdminSitesController.NewSite(SiteNewModel model, HttpPostedFileBase file) in c:\Users\rick\Documents\Visual Studio 2012\Projects\AuctionCMS\AuctionCMS.Web\Controllers\AdminSitesController.cs:69
   lambda_method(Closure , ControllerBase , Object[] ) +179
   System.Web.Mvc.ReflectedActionDescriptor.Execute(ControllerContext controllerContext, IDictionary`2 parameters) +261
   System.Web.Mvc.ControllerActionInvoker.InvokeActionMethod(ControllerContext controllerContext, ActionDescriptor actionDescriptor, IDictionary`2 parameters) +39
   System.Web.Mvc.Async.<>c__DisplayClass42.<BeginInvokeSynchronousActionMethod>b__41() +34
   System.Web.Mvc.Async.<>c__DisplayClass39.<BeginInvokeActionMethodWithFilters>b__33() +124
   System.Web.Mvc.Async.<>c__DisplayClass4f.<InvokeActionMethodFilterAsynchronously>b__49() +838059
   System.Web.Mvc.Async.<>c__DisplayClass4f.<InvokeActionMethodFilterAsynchronously>b__49() +838059
   System.Web.Mvc.Async.<>c__DisplayClass4f.<InvokeActionMethodFilterAsynchronously>b__49() +838059
   System.Web.Mvc.Async.<>c__DisplayClass4f.<InvokeActionMethodFilterAsynchronously>b__49() +838059
   System.Web.Mvc.Async.<>c__DisplayClass37.<BeginInvokeActionMethodWithFilters>b__36(IAsyncResult asyncResult) +15
   System.Web.Mvc.Async.<>c__DisplayClass2a.<BeginInvokeAction>b__20() +33
   System.Web.Mvc.Async.<>c__DisplayClass25.<BeginInvokeAction>b__22(IAsyncResult asyncResult) +838644
   System.Web.Mvc.<>c__DisplayClass1d.<BeginExecuteCore>b__18(IAsyncResult asyncResult) +28
   System.Web.Mvc.Async.<>c__DisplayClass4.<MakeVoidDelegate>b__3(IAsyncResult ar) +15
   System.Web.Mvc.Controller.EndExecuteCore(IAsyncResult asyncResult) +65
   System.Web.Mvc.Async.<>c__DisplayClass4.<MakeVoidDelegate>b__3(IAsyncResult ar) +15
   System.Web.Mvc.Controller.EndExecute(IAsyncResult asyncResult) +51
   System.Web.Mvc.<>c__DisplayClass8.<BeginProcessRequest>b__3(IAsyncResult asyncResult) +42
   System.Web.Mvc.Async.<>c__DisplayClass4.<MakeVoidDelegate>b__3(IAsyncResult ar) +15
   System.Web.Mvc.MvcHandler.EndProcessRequest(IAsyncResult asyncResult) +51
   System.Web.CallHandlerExecutionStep.System.Web.HttpApplication.IExecutionStep.Execute() +606
   System.Web.HttpApplication.ExecuteStep(IExecutionStep step, Boolean& completedSynchronously) +288

Edit:

I went with a generic approach to solve this. It's a but ugly from the caller's perspective, but it works.

   public TR AddContentItem<T, TR>(T message) where T : ContentItemMessage where TR : ContentItemMessageResponse
    {
        var response = this.auctionCmsServices.Bus.Publish<T, TR>(message);

        return response;
    }

The calling code now looks like this:

page.AddContentItem(new HtmlBlockNewMessage() { HtmlData = "This is some html" });

标签: masstransit
1条回答
混吃等死
2楼-- · 2019-05-28 12:31

Eugene's comment is correct. What's happening here is you are publishing a message of type ContentItemMessage. A consumer of HtmlBlockNewMessage will not executed since the message is published as a ContentItemMessage and a ServiceBusMessage. MassTransit message mis-typing is one of a number of things out there on how this works.

Your options:

  1. Change AddContentItem to use a generic, perhaps with a constraint
  2. Used reflection to invoke Publish with the right type information
  3. Restructure how you publish things so this isn't an issue any more

The bottom line is you should always publish as the type you want received. Polymorphism in messaging is tricky.

查看更多
登录 后发表回答