I am trying to start a Kafka server form Java
Specifically, how can I translate this line of Scala into a line of Java?
private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)
I can create the serverConfig easily, but I can't seem to be able to create the kafkaMetricsReporters
parameter.
Note: I can create a KafkaServerStartable
but I would like to create a normal KafkaServer
to avoid the JVM exiting in case of error.
Apache Kafka version 0.11.0.1
The kafkaMetricsReporters
parameter is a scala Seq
.
You can either:
Create a Java collection and convert it into a Seq:
You need to import scala.collection.JavaConverters
:
List<KafkaMetricsReporter> reportersList = new ArrayList<>();
...
Seq<KafkaMetricsReporter> reportersSeq = JavaConverters.asScalaBufferConverter(reportersList).asScala();
Use KafkaMetricsReporter.startReporters()
method to create them for you from your configuration:
As KafkaMetricsReporter
is a singleton, you need to use the MODULE
notation to use it:
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
Also the KafkaServer
constructor has 2 other arguments that are required when calling it from Java:
time
can easily be created using new org.apache.kafka.common.utils.SystemTime()
threadNamePrefix
is an Option. If you import scala.Option
, you'll be able to call Option.apply("prefix")
Putting it all together:
Properties props = new Properties();
props.put(...);
KafkaConfig config = KafkaConfig.fromProps(props);
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
KafkaServer server = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters);
server.startup();