Kafka Consumer does not receive messages

2020-03-06 13:54发布

问题:

I am a newbie in Kafka. I read many instructions on the Internet to make a Kafka Producer and Kafka Consumer. I did the former successfully which can send messages to Kafka cluster. However, I did not complete with the latter one. Please kindly help me to solve this problem. I saw my problem likes some posts on StackOverflow but I want to describe more clearly. I run Kafka and Zookeeper on Ubuntu server on Virtual Box. Use the simplest configuration (almost defaults) with 1 Kafka cluster and 1 Zookeeper cluster.

1.When I use the command line of Kafka for producer and consumer, like:

* Case 1: It works. I can see the word: Hello, World on the screen

$~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic helloKafka --partitions 1 --replication-factor 1.
$echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null.
$~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TutorialTopic --from-beginning.

2.When I use the Producer and command line of Kafka for consumer, like:

* Case 2: It works. I can see the messages which sent from the Producer on the screen

$~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic helloKafka --partitions 1 --replication-factor 1.
$~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TutorialTopic --from-beginning.
$java -cp target/Kafka_Producer_Program-0.0.1-SNAPSHOT.jar AddLab_Producer

3.When I use the Producer and the Consumer, like:

* Case 3: Only Producer works perfectly. The Consumer runs but does not shows any messages. 

$~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic helloKafka --partitions 1 --replication-factor 1.
$java -cp target/Kafka_Producer_Program-0.0.1-SNAPSHOT.jar AddLab_Producer
$java -cp target/Kafka_Consumer_Program-0.0.1-SNAPSHOT.jar AddLab_Consumer

This is my code of the Producer and Consumer. Actually, I copied them from some instructions website of Kafka.

*Producer program

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class AddLab_Producer {
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        boolean sync = false;
        String topic = args[0];
        String key = "mykey";

        for (int i = 1; i <= 3; i++) {
            String value = args[1] + " " + i;
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, value);
            if (sync) {
                producer.send(producerRecord).get();
            } else {
                producer.send(producerRecord);
            }
        }
        producer.close();
    }
}

*Consumer program

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AddLab_Consumer {

    public static class KafkaPartitionConsumer implements Runnable {

        private int tnum ;
        private KafkaStream kfs ;

        public KafkaPartitionConsumer(int id, KafkaStream ks) {
            tnum = id ;
            kfs = ks ;
        }   
        public void run() {
            // TODO Auto-generated method stub
            System.out.println("This is thread " + tnum) ;

            ConsumerIterator<byte[], byte[]> it = kfs.iterator();
                int i = 1 ;
                while (it.hasNext()) {
                    System.out.println(tnum + " " + i + ": " + new String(it.next().message()));
                    ++i ;
                }       
        }
    }

    public static class MultiKafka {    
        public void run() {
        }   
    }

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "mygroupid2");
        props.put("zookeeper.session.timeout.ms", "413");
        props.put("zookeeper.sync.time.ms", "203");
        props.put("auto.commit.interval.ms", "1000");

        ConsumerConfig cf = new ConsumerConfig(props) ;    
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) ;      
        String topic = "mytopic" ;     
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        ExecutorService executor = Executors.newFixedThreadPool(1); ;

        int threadnum = 0 ;     
        for(KafkaStream<byte[],byte[]> stream  : streams) { 
            executor.execute(new KafkaPartitionConsumer(threadnum,stream));
            ++threadnum ;
        }
    }
}

*My POM.xml file

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>HelloJava</groupId>
    <artifactId>HelloJava</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.9.0.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.4</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

I am very appreciate about your help. Thank you very much.

Screen of Consumer. It seem runs but can not receive any messages from Producer

回答1:

I've encountered the same problem as you. After a long time try, here is the answer.

There are two types of kafka new consumer api that you can choose one.

cousumer.assign(...)

consumer.subscribe(..)

And use like:

    // set these properites or you should run consumer first than run producer
    props.put("enable.auto.commit", "false");
    props.put("auto.offset.reset", "earliest");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    boolean assign = false;
    if(assign) {
        TopicPartition tp = new TopicPartition(topic, 0);
        List<TopicPartition> tps = Arrays.asList(tp);
        consumer.assign(tps);
        consumer.seekToBeginning(tps);
    }else {
        consumer.subscribe(Arrays.asList(topic));
    }

http://kafka.apache.org/documentation.html#newconsumerconfigs

If you use old consumer api, it's almost the same about properties config. Remember to add the two following code if you want to see messages produced before consumer consumes:

props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");

Hope this will help other people.