create `KafkaServer` from Java

2019-07-27 10:42发布

问题:

I am trying to start a Kafka server form Java

Specifically, how can I translate this line of Scala into a line of Java?

  private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)

I can create the serverConfig easily, but I can't seem to be able to create the kafkaMetricsReporters parameter.

Note: I can create a KafkaServerStartable but I would like to create a normal KafkaServer to avoid the JVM exiting in case of error.

Apache Kafka version 0.11.0.1

回答1:

The kafkaMetricsReporters parameter is a scala Seq.

You can either:

  1. Create a Java collection and convert it into a Seq:

    You need to import scala.collection.JavaConverters:

    List<KafkaMetricsReporter> reportersList = new ArrayList<>();
    ...
    Seq<KafkaMetricsReporter> reportersSeq = JavaConverters.asScalaBufferConverter(reportersList).asScala();
    
  2. Use KafkaMetricsReporter.startReporters() method to create them for you from your configuration:

    As KafkaMetricsReporter is a singleton, you need to use the MODULE notation to use it:

    Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
    

Also the KafkaServer constructor has 2 other arguments that are required when calling it from Java:

  • time can easily be created using new org.apache.kafka.common.utils.SystemTime()
  • threadNamePrefix is an Option. If you import scala.Option, you'll be able to call Option.apply("prefix")

Putting it all together:

Properties props = new Properties();
props.put(...);
KafkaConfig config = KafkaConfig.fromProps(props);
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
KafkaServer server = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters);
server.startup();