How do I use the requestShutdown and shutdown to d

2020-07-20 23:55发布

I am trying to use the new feature of KCL library in Java for AWS Kinesis to do a graceful shutdown by registering with shutdown hook to stop all the record processors and then the worker gracefully. The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?

Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it. What is the use then to use both, and its benefit?

1条回答
Fickle 薄情
2楼-- · 2020-07-21 00:50

Starting a consumer

As you might know that when you create a Worker, it

1) creates the consumer offset table in dynamodb

2) create leases, schedule lease taker and lease renewer at configured interval of time

If you have two partitions, then there will be two records in your same dynamodb table, meaning partition needs a lease.

eg.

{
  "checkpoint": "TRIM_HORIZON",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 38,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}

{
  "checkpoint": "49570828493343584144205257440727957974505808096533676050",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 40,
  "leaseKey": "shardId-000000000001",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}
  • schedule for taking and renewing lease is taken care by Lease Coordinator ScheduledExecutorService (called leaseCoordinatorThreadPool)

3) Then for each partition in the stream, Worker creates an internal PartitionConsumer, which actually fetches the events, and dispatches to your RecordProcessor#processRecords. see ProcessTask#call

4) on your question, you have to register your IRecordProcessorFactory impl to the worker, which will give one ProcessorFactoryImpl to each PartitionConsumer.

eg. see example here, which might be helpful

KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
 "consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
            .withKinesisClientConfig(getHttpConfiguration())
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream

Worker consumerWorker = new Worker.Builder()
            .recordProcessorFactory(new DavidsEventProcessorFactory())
            .config(streamConfig)
            .dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
            .build();


public class DavidsEventProcessorFactory implements IRecordProcessorFactory {

    private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);

    @Override
    public IRecordProcessor createProcessor() {
        logger.info("Creating an EventProcessor.");
        return new DavidsEventPartitionProcessor();
    }
}

class DavidsEventPartitionProcessor implements IRecordProcessor {

    private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);

    //TODO add consumername ?

    private String partitionId;

    private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;

    public KinesisEventPartitionProcessor() {
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.partitionId = initializationInput.getShardId();
        logger.info("Initialised partition {} for streaming.", partitionId);
    }

    @Override
    public void processRecords(ProcessRecordsInput recordsInput) {
        recordsInput.getRecords().forEach(nativeEvent -> {
            String eventPayload = new String(nativeEvent.getData().array());
            logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);

            //update offset after configured amount of retries
            try {
                recordsInput.getCheckpointer().checkpoint();
                logger.debug("Persisted the consumer offset to {} for partition {}",
                        nativeEvent.getSequenceNumber(), partitionId);
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        });
    }

    @Override
    public void shutdown(ShutdownInput shutdownReason) {
        logger.debug("Shutting down event processor for {}", partitionId);

        if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
            try {
                shutdownReason.getCheckpointer().checkpoint();
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        }
    }

}

// then start a consumer

consumerWorker.run();

Stopping a consumer

Now, when you want to stop your Consumer instance(Worker), you don't need to deal much with each PartitionConsumer, which will be taken care by Worker once you ask it to shut down.

  • with shutdown, it asks the leaseCoordinatorThreadPool to stop, which was responsible for renewing and taking leases, and awaits for termination.

  • requestShutdown on the other hand cancels the lease taker, AND notifies the PartitionConsumers about the shutdown.

And more important thing with requestShutdown is if you want to get notified on your RecordProcessor then you can implement IShutdownNotificationAware as well. That way in case of race condition when your RecordProcessor is processing an event but worker is about to shut down, you should still be able to commit your offset and then shutdown.

requestShutdown returns a ShutdownFuture, which then calls back worker.shutdown

You will have to implement following method on your RecordProcessor to get notified on requestShutdown,

class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {

   private String partitionId;

   // few implementations

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        logger.debug("Shutdown requested for {}", partitionId);
    }

}

But but if you loose the lease before notifying then it might not be called.

Summary to your questions

The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?

  • implement a IRecordProcessorFactory and IRecordProcessor.
  • then wire your RecordProcessorFactory to your Worker.

Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it?

You should use requestShutdown() for graceful shutdown, which will take care of race-condition. It was introduced in kinesis-client-1.7.1

查看更多
登录 后发表回答