Slow consumer throuput when using 2 consumer-confi

2019-02-18 17:38发布

问题:

Using the spring-integration-kafka extention and the following configuration:

<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="10000"
    zk-session-timeout="10000" zk-sync-time="2000" />

<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
                group-id="realtime-services-consumer-grp" 
                value-decoder="purchaseDecoder" 
                key-decoder="kafkaReflectionDecoder"
                max-messages="5" >
            <int-kafka:topic id="purchase" streams="1" />
        </int-kafka:consumer-configuration>
        <int-kafka:consumer-configuration 
                group-id="realtime-services-consumer-gw"
                value-decoder="eventDecoder" 
                key-decoder="kafkaReflectionDecoder" 
                max-messages="10" >
            <int-kafka:topic id="event" streams="1" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
    auto-startup="true" channel="inputFromKafka">
    <int:poller fixed-delay="20" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>

When for example I comment the first consumer-configuration I can have 300 event per minute with no issue. But when both are activated. I have a very low throughput. Total of throughput coming from both topic is less than a 50 per minutes.

Anyone know why I have such poor performance when reading from 2 topic? What did I do wrong in the configuration?

回答1:

Thank you for pointing this out!

After some big fight with my local Kafka claster I have been able to reproduce your issue and I am with some workaround for you :-).

First of all it isn't round-robin, but one by one:

for (final ConsumerConfiguration<K, V> consumerConfiguration : getConsumerConfigurations().values()) {
    Map<String, Map<Integer, List<Object>>> messages = consumerConfiguration.receive();

Where each of those consumerConfiguration are blocked on the background during that consumer-timeout="5000", if there is no messages in the KafkaStream right now. Hence entire poll task from the <int-kafka:inbound-channel-adapter> is blocked until that timeout or even worse: if each topic doesn't have message the whole waiting time out is sum of timeouts!

To overcome the issue, you can decrease consumer-timeout="5000" or provide several <int-kafka:consumer-context> and therefore <int-kafka:inbound-channel-adapter> for each topic.

Yes, it looks odd and it is really bad that we haven't found a time to take a look to this before release, but anyway feel free to raise JIRA issue to fix it.

Thank you!