The application uses .NET 4.6.1 and the Microsoft.Azure.ServiceBus.EventProcessorHost nuget package v2.0.2, along with it's dependency WindowsAzure.ServiceBus package v3.0.1 to process Azure Event Hub messages.
The application has an implementation of IEventProcessor
. When an unhandled exception is thrown from the ProcessEventsAsync
method the EventProcessorHost
never re-sends those messages to the running instance of IEventProcessor
. (Anecdotally, it will re-send if the hosting application is stopped and restarted or if the lease is lost and re-obtained.)
Is there a way to force the event message that resulted in an exception to be re-sent by EventProcessorHost
to the IEventProcessor
implementation?
One possible solution is presented in this comment on a nearly identical question: Redeliver unprocessed EventHub messages in IEventProcessor.ProcessEventsAsync
The comment suggests holding a copy of the last successfully processed event message and checkpointing explicitly using that message when an exception occurs in ProcessEventsAsync
. However, after implementing and testing such a solution, the EventProcessorHost
still does not re-send. The implementation is pretty simple:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
An analysis of things in action:
A partial log sample is available here: https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
Simple Answer: Have you tried EventProcessorHost.ResetConnection(string partiotionId)?
Complex Answer: It might be an architecture problem that needs to addressed at your end, why did the processing fail? was it a transient error? is retrying the processing logic is a possible scenario? And so on...
TLDR: The only reliable way to re-play a failed batch of events to the
IEventProcessor.ProcessEventsAsync
is to -Shutdown
theEventProcessorHost
(akaEPH
) immediately - either by usingeph.UnregisterEventProcessorAsync()
or by terminating the process - based on the situation. This will let otherEPH
instances to acquire the lease for this partition & start from the previous checkpoint.Before explaining this - I want to call-out that, this is a great Question & indeed, was one of the toughest design choices we had to make for
EPH
. In my view, it was a trade-off b/w:usability
/supportability
of theEPH
framework, overTechnical-Correctness
.Ideal Situation would have been: When the user-code in
IEventProcessorImpl.ProcessEventsAsync
throws an Exception -EPH
library shouldn't catch this. It should have let thisException
- crash the process & thecrash-dump
clearly shows thecallstack
responsible. I still believe - this is the mosttechnically-correct
solution.Current situation: The contract of
IEventProcessorImpl.ProcessEventsAsync
API &EPH
is,EventData
can be received from EventHubs service - continue invoking the user-callback (IEventProcessorImplementation.ProcessEventsAsync
) with theEventData's
& if the user-callback throws errors while invoking, notifyEventProcessorOptions.ExceptionReceived
.IEventProcessorImpl.ProcessEventsAsync
should handle all errors and incorporateRetry's
as necessary.EPH
doesn't set any timeout on this call-back to give users full control over processing-time.EventData
with a special property - for ex:type=poison-event
and re-send to the sameEventHub
(include a pointer to the actual event, copy theseEventData.Offset
andSequenceNumber
into the NewEventData.ApplicationProperties
) or fwd it to a SERVICEBUS Queue or store it elsewhere, basically, identify & defer processing the poison-event.Exceptions
- catch'em & shutdownEPH
orfailfast
the process with this exception. When theEPH
comes back up - it will start from where-it-left.Why does check-pointing 'the old event' NOT work (read this to understand
EPH
in general):Behind the scenes,
EPH
is running a pump per EventHub Consumergroup partition's receiver - whose job is to start the receiver from a givencheckpoint
(if present) and create a dedicated instance ofIEventProcessor
implementation and thenreceive
from the designated EventHub partition from the specifiedOffset
in the checkpoint (if not present -EventProcessorOptions.initialOffsetProvider
) and eventually invokeIEventProcessorImpl.ProcessEventsAsync
. The purpose of theCheckpoint
is to be able to reliably start processing messages, when theEPH
process Shutsdown and the ownership of Partition is moved to anotherEPH
instances. So,checkpoint
will be consumed only while starting the PUMP and will NOT be read, once the pump started.As I am writing this,
EPH
is at version 2.2.10...