How to implement a saga using a scatter/Gather pat

2019-03-11 14:13发布

问题:

Jimmy Boagard describes a McDonalds fast food chain here comparing it to a scatter gather pattern.

Workflow image stolen from above article:

Initial Implementation Thoughts:

To have a common interface for all of the types of FoodOrdered events that all of the food stations would get and then each food station would be able to consume/create its respective item and publish a common done event. Ex: fries and burger station gets a message regarding an order of Fries, The fries station consumes the order announces an ItemDoneEvent that the saga is listening for.

Initial Concerns:

Since the Saga doesn't care about the type of food completed just the fact that all the food is completed this would seem to be an OK solution. However after reading warnings here regarding sharing of queues and noticing that Consumer.Conditional filtering has been removed with MassTransit 3.0 It feels as though the framework is saying "Bad Things(TM) will happen" with this type of approach. But I'm not sure how else you would do it with out creating a message request and response and correlating Event for each food item in the kitchen. Ex: FriesOrdered, BurgerOrdered FriesCooked, BurgerCooked. This would be very tedious if you had to do that for every item in the kitchen?

Given the above concerns - what would a good saga example for this type of workflow look like?

回答1:

Couldn't you "simply" pass the object along in the queue, as an event parameter? When the saga listener gets an "order completed" Event it would contain the object that is completed in the event?

I imagine it being sent to the queue via a Generic method, where the object must implement IFoodOrdered

Then you can on the implement a virtual method that the saga can use to do the "generic" thing when it's picked up, and you only have to implement overloads for those special items, that require something special to happen?



回答2:

The problem with kicking back finished events to the saga is that it creates contention on a shared resource (i.e the saga state).

Jim has another post that came after the one you referenced that outlines the issue and solution. Of course, he's specifically talking about NServiceBus, but the problem and concepts are the same.

https://lostechies.com/jimmybogard/2014/02/27/reducing-nservicebus-saga-load/

Create an external storage. Put in a record for each work item. Let each workers set their own work to completed while the saga effectively polls using delayed messaging to see if all work is done.

Then you are still doing scatter-gather, but the "aggregator" has been replaced by the process manager pattern to reduce contention.



回答3:

I came into similar problem - need to publish few dozends of commands (all same interface, IMyRequest) and wait all.

Actually my command initiates other saga, which publish IMyRequestDone at the end of processing without marking saga completed. (Need to complete them at some time later.) So instead of saving number of completed nested sagas in parent saga I just query state of child saga instances.

Check on every MyRequestDone message:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x =>
{
    // timeout for all requests
    x.Delay = TimeSpan.FromMinutes(10);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});


During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(MyRequestDone)
        .Then(context =>
        {
            if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)
                throw new TimeoutException();
        })
        .If(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed
            return allDone;
        }, x => x
            .Unschedule(FailSagaOnRequestsTimeout)
            .TransitionTo(Active))
        )
        .Catch<TimeoutException>(x => x.TransitionTo(Failed))
);

During(WaitingMyResponses,
    When(FailSagaOnRequestsTimeout.Received)
        .TransitionTo(Failed)

Periodically check that all requests done (by "Reducing NServiceBus Saga load"):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x =>
{
    // check interval
    x.Delay = TimeSpan.FromSeconds(15);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});

During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(CheckAllRequestsDone.Recieved)
        .Then(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));
            if (!allDone)           
            {
                if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)              
                    throw new TimeoutException();
                throw new NotAllDoneException();
            }
        })
        .TransitionTo(Active)
        .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))
        .Catch<TimeoutException>(x => x.TransitionTo(Failed));