After restarting a Kafka Connect S3 sink task, it restarted writing all the way from the beginning of the topic and wrote duplicate copies of older records. In other words, Kafka Connect seemed to lose its place.
So, I imagine that Kafka Connect stores current offset position information in the internal connect-offsets
topic. That topic is empty which I presume is part of the problem.
The other two internal topics connect-statuses
and connect-configs
are not empty. connect-statuses
has 52 entries. connect-configs
has 6 entries; three for each of two sink connectors I have configured: connector-<name>
, task-<name>-0
, commit-<name>
.
I manually created the internal Kafka Connect topics as specified in the docs before running this:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
I can verify that the connect-offsets
topic seems to be created correctly:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
This is with a three server cluster running Confluent Platform v3.2.1 running Kafka 10.2.1.
Is connect-offsets
supposed to be empty? Why else would Kafka Connect restart at the beginning of the topic when restarting a task?
UPDATE: Response to Randall Hauch's answer.
- Explanation regarding source connector offsets vs sink connector offsets explains empty
connect-offsets
. Thanks for explanation! - I'm definitely not changing connector name.
- If the connector is down for ~five days and restarted afterwards, is there any reason that the connector offset position would expire and reset? I see
__consumer_offsets
hascleanup.policy=compact
auto.offset.reset
should only take affect if there is no position in__consumer_offsets
, right?
I'm using mostly system defaults. My Sink config JSON is as follows. I'm using a very simple custom partitioner to partition on an Avro datetime field rather than wallclock time. That feature seems to have been added in Confluent v3.2.2 so that I won't need a custom plugin for that functionality. I'm hoping to skip Confluent v3.2.2 and go straight to v3.3.0 when it is available.
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}