Why am I getting messages in the skipped queue

2019-06-24 07:52发布

问题:

I have a saga setup in a fork/join configuration.

Events defined on the saga

  • FileMetadataMsg
  • FileReadyMsg
  • SomeOtherMsg

Process starts off when a file comes in on a separate listener.

  • Publishes SagaStart(correlationId)
  • Publishes FileSavedToMsg(correlationId, fileLoc)
  • Publishes FileMetadataMsg(correlationId, metadata)
  • Publishes FileReadyMsg(correlationId, fileLoc)

Downstream endpoint of does some work on the file

Consumer<FileSavedToMsg>

  • Publishes SomeOtherMsg(GotTheFileMsg.correlationId, data)

I am getting a FileSavedToMsg in the saga_skipped queue. I can only assume it's due to having a correlationId on the FileSavedToMsg because the saga itself is not using FileSavedToMsg in its state machine and does not have an Event<FileSavedToMsg>.

If this is the reason why...should I be passing the correlationId along in a field other than the CorrelationId, so the saga doesn't see it? I need it somewhere so I can tag SomeOtherMsg with it.

Here is how the saga endpoint is defined

return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });              

            cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
            {
                epCfg.StateMachineSaga(machine, repository);
            });
});

Here is how the worker endpoint is defined

return Bus.Factory.CreateUsingRabbitMq(x =>
{
    var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
        {
            c.PrefetchCount = 1;
            c.Instance(_studyCreatedMsgConsumer);
        });
 });

These are running on the same machine, but in seperate Console/Topshelf applications.

回答1:

If you are getting messages on a queue that are not consumed by a consumer on that receive endpoint, it might be that you either previously were consuming that message type and removed it from the consumer (or saga, in your case) or you were using the queue from some other purpose and it consumed that message type.

Either way, if you go into the RabbitMQ management console and look for the queue, you can expand the Bindings chevron, click to go to the exchange of the same name (that's a standard MassTransit convention), and then expand the bindings of the exchange to see which message types (the exchanges named like .NET type names) are bound to that exchange.

If you see one that is not consumed by the endpoint, that's the culprit. You can Unbind it using the UI, after which messages published will no longer be sent to the queue.



标签: masstransit