flink kafka consumer groupId not working

2019-05-09 06:40发布

问题:

I am using kafka with flink. In a simple program, I used flinks FlinkKafkaConsumer09, assigned the group id to it.

According to Kafka's behavior, when I run 2 consumers on the same topic with same group.Id, it should work like a message queue. I think it's supposed to work like: If 2 messages sent to Kafka, each or one of the flink program would process the 2 messages totally twice(let's say 2 lines of output in total).

But the actual result is that, each program would receive 2 pieces of the messages.

I have tried to use consumer client that came with the kafka server download. It worked in the documented way(2 messages processed).
I tried to use 2 kafka consumers in the same Main function of a flink programe. 4 messages processed totally.
I also tried to run 2 instances of flink, and assigned each one of them the same program of kafka consumer. 4 messages.

Any ideas? This is the output I expect:

1> Kafka and Flink2 says: element-65  
2> Kafka and Flink1 says: element-66 

Here's the wrong output i always get:

1> Kafka and Flink2 says: element-65  
1> Kafka and Flink1 says: element-65  
2> Kafka and Flink2 says: element-66  
2> Kafka and Flink1 says: element-66 

And here is the segment of code:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Kafka and Flink1 says: " + value;
        }
    }).print();


    env.execute();
}

I have tried to run it twice and also in the other way: create 2 datastreams and env.execute() for each one in the Main function.

回答1:

There was a quite similar question on the Flink user mailing list today, but I can't find the link to post it here. So here a part of the answer:

"Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the “group.id” setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers."

Maybe that clarifies things for you.

Also, there is a blog post about working with Flink and Kafka that may help you (https://data-artisans.com/blog/kafka-flink-a-practical-how-to).



回答2:

Since there is not much use of group.id of flink kafka consumer other than commiting offset to zookeeper. Is there any way of offset monitoring as far as flink kafka consumer is concerned. I could see there is a way [with the help of consumer-groups/consumer-offset-checker] for console consumers but not for flink kafka consumers.

We want to see how our flink kafka consumer is behind/lagging with kafka topic size[total number of messages in topic at given point of time], it is fine to have it at partition level.