I want to read multiple kafka from flink.
I have a cluser of 3 computers for kafka. With the following topic
Topic:myTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: myTopic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
From Flink I execute the following code :
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092");
properties.setProperty("group.id", "flink");
DataStream<T> stream = env.addSource(new FlinkKafkaConsumer09<>("myTopic", new SimpleStringSchema(), properties)
stream.map(....)
env.execute()
I launch 3 times the same job.
If I execute this code with one broker it's work well but with 3 broke (on 3 different machine) only one partition is read.
(In this question) the solution proposed was
to create separate instances of the FlinkKafkaConsumer for each cluster (that's what you are already doing), and then union the resulting streams
It's not working in my case.
So my questions are :
- Do I missing something ?
- If we had a new computer in the Kafka cluster do we need to change flink's code to add a consumer for the new borker ? Or can we handle this automatically at runtime ?