I am trying to use the new feature of KCL library in Java for AWS Kinesis to do a graceful shutdown by registering with shutdown hook to stop all the record processors and then the worker gracefully. The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?
Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it. What is the use then to use both, and its benefit?
Starting a consumer
As you might know that when you create a
Worker
, it1) creates the consumer offset table in dynamodb
2) create leases, schedule lease taker and lease renewer at configured interval of time
If you have two partitions, then there will be two records in your same dynamodb table, meaning partition needs a lease.
eg.
leaseCoordinatorThreadPool
)3) Then for each partition in the stream,
Worker
creates an internal PartitionConsumer, which actually fetches the events, and dispatches to yourRecordProcessor#processRecords
. see ProcessTask#call4) on your question, you have to register your
IRecordProcessorFactory
impl to theworker
, which will give oneProcessorFactoryImpl
to eachPartitionConsumer
.eg. see example here, which might be helpful
// then start a consumer
Stopping a consumer
Now, when you want to stop your Consumer instance(
Worker
), you don't need to deal much with eachPartitionConsumer
, which will be taken care byWorker
once you ask it to shut down.with
shutdown
, it asks theleaseCoordinatorThreadPool
to stop, which was responsible for renewing and taking leases, and awaits for termination.requestShutdown
on the other hand cancels the lease taker, AND notifies thePartitionConsumer
s about the shutdown.And more important thing with
requestShutdown
is if you want to get notified on yourRecordProcessor
then you can implementIShutdownNotificationAware
as well. That way in case of race condition when yourRecordProcessor
is processing an event but worker is about to shut down, you should still be able to commit your offset and then shutdown.requestShutdown
returns aShutdownFuture
, which then calls back worker.shutdownYou will have to implement following method on your
RecordProcessor
to get notified onrequestShutdown
,But but if you loose the lease before notifying then it might not be called.
Summary to your questions
IRecordProcessorFactory
andIRecordProcessor
.RecordProcessorFactory
to yourWorker
.You should use
requestShutdown()
for graceful shutdown, which will take care of race-condition. It was introduced in kinesis-client-1.7.1