I have a saga setup in a fork/join configuration.
Events defined on the saga
FileMetadataMsg
FileReadyMsg
SomeOtherMsg
Process starts off when a file comes in on a separate listener.
- Publishes
SagaStart(correlationId)
- Publishes
FileSavedToMsg(correlationId, fileLoc)
- Publishes
FileMetadataMsg(correlationId, metadata)
- Publishes
FileReadyMsg(correlationId, fileLoc)
Downstream endpoint of does some work on the file
Consumer<FileSavedToMsg>
- Publishes
SomeOtherMsg(GotTheFileMsg.correlationId, data)
I am getting a FileSavedToMsg in the saga_skipped queue. I can only assume it's due to having a correlationId on the FileSavedToMsg because the saga itself is not using FileSavedToMsg in its state machine and does not have an Event<FileSavedToMsg>
.
If this is the reason why...should I be passing the correlationId along in a field other than the CorrelationId, so the saga doesn't see it? I need it somewhere so I can tag SomeOtherMsg with it.
Here is how the saga endpoint is defined
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
{
epCfg.StateMachineSaga(machine, repository);
});
});
Here is how the worker endpoint is defined
return Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
{
c.PrefetchCount = 1;
c.Instance(_studyCreatedMsgConsumer);
});
});
These are running on the same machine, but in seperate Console/Topshelf applications.