wso2 Inbound endpoint - Kafka consumer

2019-06-14 12:27发布

I am creating a Kafka consumer.

Using this article I Installed Zooker and Kafka. https://dzone.com/articles/running-apache-kafka-on-windows-os

Using this documentation I am configuring the Inbound Kafka Endpoint https://docs.wso2.com/display/EI611/Kafka+Inbound+Protocol

When I try to deploy the Inbound Endpoint I am getting this error

[2017-09-22 12:19:06,161] [] ERROR - KAFKAPollingConsumer  Error in Creating Kafka Consumer Connector
[2017-09-22 12:19:08,150] []  INFO - KAFKAMessageListener Creating Kafka Consumer Connector...

[2017-09-22 12:19:08,152] [] ERROR - KAFKAMessageListener Error in Creating Kafka Consumer Connector.Exception while loading Zookeeper JAAS login context 'Client' org.apache.kafka.common.KafkaException: Exception while loading Zookeeper JAAS login context 'Client'

        at org.apache.kafka.common.security.JaasUtils.isZkSecurityEnabled(JaasUtils.java:43)
        at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:197)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:70)
        at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:123)
        at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
        at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAMessageListener.createKafkaConsumerConnector(KAFKAMessageListener.java:56)
        at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAPollingConsumer.poll(KAFKAPollingConsumer.java:145)
        at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAPollingConsumer.execute(KAFKAPollingConsumer.java:116)
        at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKATask.taskExecute(KAFKATask.java:48)
        at org.wso2.carbon.inbound.endpoint.common.InboundTask.execute(InboundTask.java:45)
        at org.wso2.carbon.mediation.ntask.NTaskAdapter.execute(NTaskAdapter.java:98)
        at org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:213)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.SecurityException: java.io.IOException: C:\WS02\WSO2EI~1.1\bin\..\repository\conf\identity\jaas.conf (No such file or directory)
        at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
        at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
        at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
        at org.apache.kafka.common.security.JaasUtils.isZkSecurityEnabled(JaasUtils.java:40)
        ... 19 more

Caused by: java.io.IOException: C:\WS02\WSO2EI~1.1\bin..\repository\conf\identity\jaas.conf (No such file or directory)

        at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
        at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262)
        at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)

1条回答
聊天终结者
2楼-- · 2019-06-14 13:03

If you need to configure Kafka without security, the recommonded version is 0.8.1.1.

From Kafka version 0.9 onwards, they have introduced Kafka security mechanism with SSL and SASL, which will be configured via JAAS. So, when configuring Kafka with WSO2 Products (EI, DAS, CEP) you need to add this config file (jaas.conf) to /repository/conf/identity.

In this case, WSO2 product acts as the Kafka client and the configuration in the jaas.conf will be used to login to the kafka server. Please follow the Kafka security documentation [1] for configuring security for server and client.

Example content for jaas.conf file which is configured for SASL/ Plaintext.

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kafka"
   password="kafka-secret"
   user_kafka="kafka-secret"
   user_ibm="ibm-secret";
};

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="kafka-secret";
};

Please note that kafka 0.9 supports only SASL/Kerberos authentication. Please carefully follow the Kafka security documentation related to your Kafka version and configure it properly.

[1] http://kafka.apache.org/090/documentation.html#security_sasl

查看更多
登录 后发表回答