MassTransit saga/statemachine not returning the re

2019-02-20 14:36发布

I feel like I'm so close to getting this working but can't seem to get it over the line...

I have a .NET Core ASP application with saga/state machine that seems to work well for the most part. It:

  1. Receives a request
  2. Publishes an event that is picked up by a routing slip
  3. When the routing slip completes it publishes an event
  4. The event is picked up the saga
  5. The saga then sends a Request to a request/response consumer
  6. The response comes back to the saga
  7. BUT THEN I then use RespondAsync to attempt to send a response back to the original calling controller but alas nothing goes back

My controller looks like:

private readonly IRequestClient<IRequestLink, IRequestLinkCompleted> _client;

public async Task<IActionResult> CreateLinkAsync([FromBody]CreateLinkRequest request)
{
        var requestLink = new RequestLink            {

            GroupName = $"{request.Name} Group",
            Name = request.Name,
            LinkId = Guid.NewGuid()
        };

        var result = await _client.Request(requestLink).ConfigureAwait(false);

        return Ok();
}

A cut down version of my saga looks like:

Request(() => LinkRequest, x => x.RequestId, cfg =>
        {
            cfg.ServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
            cfg.SchedulingServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
            cfg.Timeout = TimeSpan.FromSeconds(30);
        });


During(RequestReceived,
                When(LinkCreatedEvent)
                    .Request(LinkRequest, context => new SelectUrlByPublicId(context.Data.DatabaseId, context.Data.LinkId))
                    .TransitionTo(LinkRequest.Pending));

            During(LinkRequest.Pending,                
                When(LinkRequest.Completed)
                    .ThenAsync(context => context.RespondAsync(new RequestLinkCompleted
                    {
                        CorrelationId = LinkRequest.GetRequestId(context.Instance),
                        DatabaseId = context.Data.DatabaseId
                    }))
                    .Finalize());

And finally in my start up code I configure the request/response as such:

services.AddScoped<IRequestClient<IRequestLink, IRequestLinkCompleted>>(x => new MessageRequestClient<IRequestLink, IRequestLinkCompleted>(_bus, new Uri($"{messageBusSettings.Host}/create_link_saga"), TimeSpan.FromSeconds(30)));

I'm guessing that the RespondAsync call isn't using the right/original requestId but I've got no idea how I check or change that. can anyone help?

1条回答
我想做一个坏孩纸
2楼-- · 2019-02-20 15:06

Since the context of the original request is lost, you need to do essentially what was being done in RespondAsync yourself.

  1. Save the ResponseAddress from the request message context
  2. Save the RequestId from the same context

In your saga, when it's time to respond, you need to use context.GetSendEndpoint(context.Instance.SavedResponseAddress) and then call Send setting the RequestId in the delegate to match the saved RequestId from the original context.

Now, you might need to save these in routing slip variables since your saga doesn't get the command, just the subsequent events, but the net effect is the same, the original request message is gone and never seen by the saga.

查看更多
登录 后发表回答