MassTransit saga with Redis persistence gives Meth

2019-08-24 04:53发布

问题:

I'm trying to add Redis persistence to my saga which is managing calls to a routing slip (as well as additional messages to other consumers depending on the result of the routing slip) in the hopes that it will solve another timeout issue I keep getting.

However, I get an error message which goes in to my saga_error queue in RabbitMQ.

The error shown in the message is:

Method 'Accept' in type 'GreenPipes.DynamicInternal.Automatonymous.State' from assembly 'AutomatonymousGreenPipes.DynamicInternalc83411641fad46798326d78fe60522c9, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' does not have an implementation

My correlation configuration code is:

InstanceState(s => s.CurrentState);

Event(() => RequestLinkEvent, x => x.CorrelateById(context => context.Message.LinkId).SelectId(y => y.Message.LinkId));
Event(() => LinkCreatedEvent, x => x.CorrelateById(context => context.Message.LinkId));
Event(() => CreateLinkGroupFailedEvent, x => x.CorrelateById(context => context.Message.LinkId));
Event(() => CreateLinkFailedEvent, x => x.CorrelateById(context => context.Message.LinkId));
Event(() => RequestLinkFailedEvent, x => x.CorrelateById(context => context.Message.LinkId));

Request(() => LinkRequest, x => x.UrlRequestId, cfg =>
            {
                cfg.ServiceAddress = new Uri($"{hostAddress}/{nameof(SelectUrlByPublicId)}");
                cfg.SchedulingServiceAddress = new Uri($"{hostAddress}/{nameof(SelectUrlByPublicId)}");
                cfg.Timeout = TimeSpan.FromSeconds(30);
            });

The LinkId in the above code is always a unique Guid.

The issue seems to happen when the saga is reading back an event that has been sent from my routing slip (be it a success or failure event).

An example event interface that is not working is:

public interface ILinkCreated
{
    Guid? CorrelationId { get; set; }
    int DatabaseId { get; set; }
    Guid LinkId { get; set; }
    string LinkName { get; set; }
}

If I switch back to an InMemorySagaRepository everything works (locally). I've tried so many different combinations of things and have now hit a brick wall.

I've updated all packages to the latest version. I've also been checking my redis database and can see that the state machine instance goes in each time correctly.

I also saw that someone on Google groups had the same issue but there's no response to their post.

回答1:

The problem here is request-response.

It works like this:

  1. MT puts the request id to the saga state property UrlRequestId
  2. The request is sent
  3. You get a response back, the response contains the requestor address and the request id in its header
  4. MT uses saga repository to find your instance using repo.Find(x => x.UrlRequestId == message.Headers.RequestId) (hence this is not the real code but this is what happens)
  5. Redis (or any other KVS) doesn't support queries so we don't support queries in saga repositories too and you get "not implemented" exception

Your correlation specification for responses has no effect since Request always uses headers to find a saga instance for which the response belongs to.

You can workaround this by not using request-response and instead emit an event using context.Publish(new LinkCreatedEvent { ... , CorrelationId = context.Message.CorrelationId }) and using the usual correlation.



回答2:

So to answer my own question and perhaps shine a light on my own stupidity. The issue was in fact being caused by how I had setup my StateMachineInstance.

Instead of having CurrentState of type State as below:

public State CurrentState {get; set;}

I should have specified it as a string as such:

public string CurrentState { get; set;}

Now it can be deserialized in to the object correctly. I suspect this may have been causing my timeout issues with the InMemorySagaRepository on my staging server too.