为什么不是这个卡夫卡消费者关停?(Why isn't this Kafka consumer

2019-10-22 12:31发布

我期待的消耗测试读取一条消息和关机。 然而,这是没有的,连后我打电话的consumer.shutdown() 想知道为什么吗?

测试

public class AppTest
{
    App app=new App();

    @org.junit.Test
    public void publish()
    {

        assertTrue(app.publish("temptopic","message"));
    }

    @org.junit.Test
    public void consume()
    {
        app.publish("temptopic","message");
        assertEquals("message",app.consume("temptopic","tempgroup"));
    }
}

在测试类

public class App 
{
    public boolean publish(String topicName, String message) {

        Properties p=new Properties();
        p.put("bootstrap.servers","localhost:9092");
        p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        p.put("request.required.acks","1");

        KafkaProducer producer = new KafkaProducer<String,String>(p);

        ProducerRecord record=new ProducerRecord(topicName,"mykey",message);

        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata recordMetadata=null;
        try {
            recordMetadata = future.get();
        } catch (Exception e) {
            return false;
        }

        return true;
    }

    public String consume(String topicName, String groupId)
    {
        Properties p=new Properties();
        p.put("zookeeper.connect","localhost:2181");
        p.put("group.id","groupId");
        p.put("zookeeper.session.timeout.ms","500");
        p.put("zookeeper.sync.time.ms","250");
        p.put("auto.commit.interval.ms", "1000");

        ConsumerConnector consumer=null;
        String result = "";

        try
        {
            consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p));

            Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>();
            topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read

            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount);

            List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);


            for (KafkaStream stream : streams) {
                ConsumerIterator it = stream.iterator();
                while (it.hasNext()) {
                    MessageAndMetadata msg = it.next();
                    String strMsg = new String((byte[]) msg.message());
                    System.out.println("received: " + strMsg);
                    result += strMsg;
                }
            }
        }
        finally
        {
            if (consumer != null)
            {
                consumer.shutdown();
            }
        }
        return result;
    }
}

Answer 1:

该ConsumerIterator将默认块无限期只是等待下一条消息。 为了把它弄出来while循环,你会需要中断线程。

有消费属性“consumer.timeout.ms”,它可以被设置为在这之后,超时会抛出异常给消费者,但也许这不是一个单元测试是最实用的解决方案的值。



文章来源: Why isn't this Kafka consumer shutting down?