Spring Integration Kafka adaptor not producing mes

2019-06-23 18:46发布

问题:

I am struggling this for days now.

I am using SI adaptor for kafka under Spring-boot container.

I have configured zookeeper and kafka on my machine. I also created console producer and consumer tested it and everything works fine(I manage to produce vis the console messages and have the console consumer to consume them).

I tried now to produce messages via Spring integration kafka outbound adapter but the console consumer wont consume the message

SI/Spring xd xml:

<int:publish-subscribe-channel id="inputToKafka"/>


    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                        kafka-producer-context-ref="kafkaProducerContext"
                                        auto-startup="true"
                                        order="1"
                                        channel="inputToKafka">

    </int-kafka:outbound-channel-adapter>


    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="localhost:9092"
                                              async="true"
                                              topic="zerg.hydra"
                                              compression-codec="default"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
</beans>

Java:

@Named
public class KafkaProducer {

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel inputToKafka;


    public void sendMessageToKafka(String message)
    {
        inputToKafka.send(
                MessageBuilder.withPayload(message)
                        .setHeader("messageKey", "3")
                        .setHeader("topic", "zerg.hydra").build());

    }

}

this is how I run kafka console consumer:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zerg.hydra --from-beginning

logs:

Testing started at 12:49 PM ...
12:49:54 PM: Executing external tasks 'cleanTest test'...
:cleanTest
:compileJava
:processResources UP-TO-DATE
:classes
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
12:50:07,165 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
12:50:07,166 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
12:50:07,166 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/test/logback.xml]
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs multiple times on the classpath.
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/test/logback.xml]
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/main/logback.xml]
12:50:07,247 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set
12:50:07,250 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
12:50:07,259 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [stdout]
12:50:07,281 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
12:50:07,327 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [reactor] to INFO
12:50:07,327 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.projectreactor] to INFO
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.springframework] to WARN
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.springframework.integration] to DEBUG
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
12:50:07,328 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [stdout] to Logger[ROOT]
12:50:07,331 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
12:50:07,332 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@3de433b4 - Registering current configuration as safe fallback point


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.1.9.RELEASE)

12:50:09.530 [Test worker] INFO  o.s.i.config.IntegrationRegistrar - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
12:50:09.544 [Test worker] DEBUG o.s.i.config.IntegrationRegistrar - SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
12:50:10.717 [Test worker] INFO  o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
12:50:10.719 [Test worker] INFO  o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
12:50:10.973 DEBUG [Test worker][org.jboss.logging] Logging Provider: org.jboss.logging.Log4jLoggerProvider
12:50:10.974 INFO  [Test worker][org.hibernate.validator.internal.util.Version] HV000001: Hibernate Validator 5.0.3.Final
12:50:10.991 DEBUG [Test worker][org.hibernate.validator.internal.engine.resolver.DefaultTraversableResolver] Cannot find javax.persistence.Persistence on classpath. Assuming non JPA 2 environment. All properties will per default be traversable.
12:50:11.006 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom MessageInterpolator of type org.springframework.validation.beanvalidation.LocaleContextMessageInterpolator
12:50:11.011 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ConstraintValidatorFactory of type org.springframework.validation.beanvalidation.SpringConstraintValidatorFactory
12:50:11.018 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ParameterNameProvider of type com.sun.proxy.$Proxy46
12:50:11.024 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] Trying to load META-INF/validation.xml for XML based Validator configuration.
12:50:11.032 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] No META-INF/validation.xml found. Using annotation based configuration only.
12:50:12.089 [Test worker] INFO  o.a.catalina.core.StandardService - Starting service Tomcat
12:50:12.091 [Test worker] INFO  o.a.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/7.0.56
12:50:14.162 [localhost-startStop-1] INFO  o.a.c.c.C.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
12:50:16.567 [Test worker] INFO  o.s.i.k.support.ProducerFactoryBean - Using producer properties => {metadata.broker.list=localhost:9092, compression.codec=0, producer.type=async}
12:50:17.036 INFO  [Test worker][kafka.utils.VerifiableProperties] Verifying properties
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property compression.codec is overridden to 0
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property metadata.broker.list is overridden to localhost:9092
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property producer.type is overridden to async
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.resolver.DefaultTraversableResolver] Cannot find javax.persistence.Persistence on classpath. Assuming non JPA 2 environment. All properties will per default be traversable.
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom MessageInterpolator of type org.springframework.validation.beanvalidation.LocaleContextMessageInterpolator
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ConstraintValidatorFactory of type org.springframework.validation.beanvalidation.SpringConstraintValidatorFactory
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ParameterNameProvider of type com.sun.proxy.$Proxy46
12:50:17.592 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] Trying to load META-INF/validation.xml for XML based Validator configuration.
12:50:17.592 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] No META-INF/validation.xml found. Using annotation based configuration only.
12:50:18.967 [Test worker] DEBUG o.s.i.c.GlobalChannelInterceptorProcessor - No global channel interceptors.
12:50:18.978 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {mongo:outbound-channel-adapter:mongoAdapter.adapter} as a subscriber to the 'mongoAdapter' channel
12:50:18.979 [Test worker] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.mongoAdapter' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started mongoAdapter.adapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {mongo:outbound-channel-adapter:adapterWithConverter.adapter} as a subscriber to the 'adapterWithConverter' channel
12:50:18.979 [Test worker] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.adapterWithConverter' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started adapterWithConverter.adapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {message-handler:kafkaOutboundChannelAdapter} as a subscriber to the 'inputToKafka' channel
12:50:18.979 [Test worker] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.inputToKafka' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started kafkaOutboundChannelAdapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
12:50:18.979 [Test worker] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.errorChannel' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started _org.springframework.integration.errorLogger
12:50:19.026 [Test worker] INFO  o.a.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8091"]
12:50:19.040 [Test worker] INFO  o.a.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8091"]
12:50:19.047 [Test worker] INFO  o.a.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
12:50:19.338 [Test worker] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'inputToKafka', message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
12:50:19.338 [Test worker] DEBUG o.s.i.k.o.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0 received message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
12:50:19.362 [Test worker] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'inputToKafka', message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
Empty test suite.
12:50:19.402 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {mongo:outbound-channel-adapter:mongoAdapter.adapter} as a subscriber to the 'mongoAdapter' channel
12:50:19.417 [Thread-4] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.mongoAdapter' has 0 subscriber(s).
12:50:19.419 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped mongoAdapter.adapter
12:50:19.420 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {mongo:outbound-channel-adapter:adapterWithConverter.adapter} as a subscriber to the 'adapterWithConverter' channel
12:50:19.422 [Thread-4] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.adapterWithConverter' has 0 subscriber(s).
12:50:19.422 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped adapterWithConverter.adapter
12:50:19.423 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {message-handler:kafkaOutboundChannelAdapter} as a subscriber to the 'inputToKafka' channel
12:50:19.424 [Thread-4] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.inputToKafka' has 0 subscriber(s).
12:50:19.424 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped kafkaOutboundChannelAdapter
12:50:19.425 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
12:50:19.426 [Thread-4] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.errorChannel' has 0 subscriber(s).
12:50:19.426 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped _org.springframework.integration.errorLogger
BUILD SUCCESSFUL
Total time: 25.469 secs
12:50:20 PM: External tasks execution finished 'cleanTest test'.

I tried in the same app to use the Offical Kafka client producer and it worked just fine:

@Named
public class KafkaProducerJava {

    ProducerConfig config=null;
    Producer<String, String> producer;
    Properties props=null;

    @PostConstruct
    public void init()
    {
        props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
       // props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");



    }


    public void sendMsgToKafka(String msg)
    {
        config= new ProducerConfig(props);
        producer=new Producer<String, String>(config);
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "", msg);

        producer.send(data);
        producer.close();
    }

Any idea why the message never reached to my consumer via the Spring Integration kafka adaptor??

回答1:

I just ran it in XD and it worked fine for me...

$ bin/xd-shell
 _____                           __   _______
/  ___|          (-)             \ \ / /  _  \
\ `--. _ __  _ __ _ _ __   __ _   \ V /| | | |
 `--. \ '_ \| '__| | '_ \ / _` |  / ^ \| | | |
/\__/ / |_) | |  | | | | | (_| | / / \ \ |/ /
\____/| .__/|_|  |_|_| |_|\__, | \/   \/___/
      | |                  __/ |
      |_|                 |___/
eXtreme Data
1.1.0.BUILD-SNAPSHOT | Admin Server Target: http://localhost:9393
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name foo --definition "time | kafka --topic=test" --deploy
Created and deployed new stream 'foo'
xd:>stream destroy foo
Destroyed stream 'foo'
xd:>

.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
2014-11-26 10:03:09
2014-11-26 10:03:10
2014-11-26 10:03:11
2014-11-26 10:03:12
2014-11-26 10:03:13

I also wrote this test case...

public class OutboundTests {

    @Test
    public void test() throws Exception {
        KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
        ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>("test");
        producerMetadata.setValueClassType(String.class);
        producerMetadata.setKeyClassType(String.class);
        Encoder<String> encoder = new StringEncoder<String>();
        producerMetadata.setValueEncoder(encoder);
        producerMetadata.setKeyEncoder(encoder);
        ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata, "localhost:9092");
        ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
        kafkaProducerContext.setProducerConfigurations(Collections.singletonMap("test", config));
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<String, String>(kafkaProducerContext);
        handler.handleMessage(MessageBuilder.withPayload("foo")
                .setHeader("messagekey", "3")
                .setHeader("topic", "test")
                .build());
    }

}

...to simulate what you are doing and that worked too. Are you seeing any logged exceptions? I see you are not setting the key/value types and encoders.

EDIT:

As discussed in the comments, the issue is that you are using an async producer. In your test example that "works", you are closing the producer, which flushes the queue. By default, the queue won't be flushed for 5 seconds and your test case is not waiting long enough.

I updated my test to include an XML configured version, and reduced the queue.buffering.max.ms to 500ms and added code to the test to wait a couple of seconds before terminating.

See the new commit for details.



回答2:

I was facing the same issue since morning and finally found the solution. If you prefix the topic with kafka as given below

.setHeader("kafka_topic", "zerg.hydra")

This is a issue with API as internally, while looking up the key from from the message header, kafka_ is getting prefixed.