Reading from multiple broker kafka with flink

2019-06-10 15:19发布

问题:

I want to read multiple kafka from flink.

I have a cluser of 3 computers for kafka. With the following topic

Topic:myTopic   PartitionCount:3    ReplicationFactor:1 Configs:
Topic: myTopic  Partition: 0    Leader: 2   Replicas: 2 Isr: 2
Topic: myTopic  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: myTopic  Partition: 2    Leader: 1   Replicas: 1 Isr: 1

From Flink I execute the following code :

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092");
properties.setProperty("group.id", "flink");

DataStream<T> stream = env.addSource(new FlinkKafkaConsumer09<>("myTopic", new SimpleStringSchema(), properties)
stream.map(....)
env.execute()

I launch 3 times the same job.

If I execute this code with one broker it's work well but with 3 broke (on 3 different machine) only one partition is read.

(In this question) the solution proposed was

to create separate instances of the FlinkKafkaConsumer for each cluster (that's what you are already doing), and then union the resulting streams

It's not working in my case.

So my questions are :

  1. Do I missing something ?
  2. If we had a new computer in the Kafka cluster do we need to change flink's code to add a consumer for the new borker ? Or can we handle this automatically at runtime ?

回答1:

It seems you've misunderstood the concept of Kafka's distributed streams.

Kafka topic consists of several partitions (3 in your case). Each consumer can consume one or more of these partitions. If you start 3 instances of your app with the same group.id, each consumer will indeed read data from just one broker – it tries to distribute the load evenly so it's one partition per consumer.

I recommend to read more about this topic, especially about the concept of consumer groups in Kafka documentation.

Anyway FlinkKafkaConsumer09 can run in multiple parallel instances, each of which will pull data from one or more Kafka partitions. You don't need to worry about creating more instances of the consumer. One instance of consumer can pull records from all of the partitions.

I have no idea why you're starting the job 3 times instead of once with parallelism set to 3. That would solve your problem.

DataStream<T> stream =
      env.addSource(new FlinkKafkaConsumer09<>("myTopic", new SimpleStringSchema(), properties))
              .setParallelism(3);