Consumer reads only from a specific broker in a 3

2019-09-21 21:57发布

The scenario

I have started all the components for a multi-broker configuration in a single machine using the shell scripts found in https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz

  1. Started zookeeper with the zookeeper-properties
  2. Started 3 brokers with 3 different server properties. They differentiate only on these config values
broker.id
log.dirs
port

I have also tried to change offsets.topic.replication.factor and transaction.state.log.replication.factor, but I don't believe they are relevant.

Note: the order how I started the brokers is 0 , 1 , 2

  1. Create a topic with replication factor 3 and one partition
bin/kafka-topics.sh --create --topic repl_topic --zookeeper localhost:2181 --replication-factor 3 --partitions 1
  1. Started console producer and consumer
bin/kafka-console-producer.sh --topic repl_topic --broker-list localhost:9092,localhost:9093,localhost:9094
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 -topic repl_topic --from-beginning

Producer and consumer appear to work correctly. However if I shutdown by Ctrl-C the broker 0 (even if it is not leader), the consumer receives a warning but it doesn't receive anymore message from the producer. Only when broker 0 will be up again, the consumer will receive all the messages.

My conclusion

The consumer is dependent on broker 0 only. It doesn't interact with the others.

My question

Why?

1条回答
地球回转人心会变
2楼-- · 2019-09-21 22:17

I have finally figured out the problem and I have fixed that. Checking the __consumer_offsets, I have noticed it is not replicated.

 bin/kafka-topics.sh --topic __consumer_offsets --zookeeper localhost:2181 --describe
Topic:__consumer_offsets    PartitionCount:50   ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
    Topic: __consumer_offsets   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 4    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 5    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 6    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 7    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 8    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 9    Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 10   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 11   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 12   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 13   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 14   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 15   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 16   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 17   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 18   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 19   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 20   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 21   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 22   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 23   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 24   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 25   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 26   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 27   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 28   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 29   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 30   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 31   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 32   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 33   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 34   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 35   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 36   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 37   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 38   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 39   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 40   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 41   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 42   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 43   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 44   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 45   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 46   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 47   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 48   Leader: 0   Replicas: 0 Isr: 0
    Topic: __consumer_offsets   Partition: 49   Leader: 0   Replicas: 0 Isr: 0

Indeed, the first time I have started a consumer it was for a topic with a replication_factor equal to 1. In that time, the consumer created only one replica. That topic was not anymore updated and then the other brokers were not able to see it if broker 0 was going down.

Solution

It is possible to establish a new replication level per partition by the command bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file new_reassignment.json --execute

The new_reassignment.json json file used there has the following content

{"version":1,"partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":3,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":4,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":5,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":6,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":7,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":8,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":9,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":10,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":11,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":12,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":13,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":14,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":15,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":16,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":17,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":18,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":19,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":20,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":21,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":22,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":23,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":24,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":25,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":26,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":27,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":28,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":29,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":30,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":31,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":32,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":33,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":34,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":35,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":36,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":37,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":38,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":39,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":40,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":41,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":42,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":43,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":44,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":45,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":46,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":47,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":48,"replicas":[0,1,2],"log_dirs":["any","any","any"]},
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

At this point, it is possible to manage the failure of any broker with sucess.

__consumer_offsets -- extra info

The number of partitions in this particular topic is set per default to 50. The consumer triggers its creation. The topic is mainly used by the consumer to commit the received message for each topic:partion. If this __consumer_offsets topic is not visible to the other brokers, the brokers will not have a way to inform the consumer to consume new messages.

查看更多
登录 后发表回答