How Can we create a topic in Kafka from the IDE us

2019-01-03 22:38发布

How Can we create a topic in Kafka from the IDE using API because when I do this:

bin/ --topic mytopic --replica 3 --zookeeper localhost:2181

I get the error:

bash: bin/ No such file or directory

And I followed the developer setup as it is.

2楼-- · 2019-01-03 23:23

A few ways your call wouldn't work.

  1. If your Kafka cluster didn't have enough nodes to support a replication value of 3.

  2. If there is a chroot path prefix you have to append it after the zookeeper port

  3. You arent in the Kafka install directory when running (This is the most likely)

3楼-- · 2019-01-03 23:25

In Kafka 0.8.1+ -- the latest version of Kafka as of today -- you can programmatically create a new topic via AdminCommand. The functionality of CreateTopicCommand (part of the older Kafka 0.8.0) that was mentioned in one of the previous answers to this question was moved to AdminCommand.

Scala example for Kafka 0.8.1:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
// createTopic() will only seem to work (it will return without error).  The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,

// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)

Build dependencies, using sbt as example:

libraryDependencies ++= Seq(
  "com.101tec" % "zkclient" % "0.4",
  "org.apache.kafka" % "kafka_2.10" % ""
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri"),

EDIT: Added Java example for Kafka (latest version as of Jan 2016).

Maven dependencies:



import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;
    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(

    // Security for Kafka was added in Kafka
    boolean isSecureKafkaCluster = false;
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);


EDIT 2: Added Java example for Kafka (latest version as of April 2017).

Maven dependencies:



import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here

    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(

    // Security for Kafka was added in Kafka
    boolean isSecureKafkaCluster = false;

    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);

We Are One
4楼-- · 2019-01-03 23:25

You can try with kafka.admin.CreateTopicCommand scala class to create Topic from Java code...providng the necessary arguments.

String [] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "10.***.***.***:2181";
arguments[2] = "--replica";
arguments[3] = "1";
arguments[4] = "--partition";
arguments[5] = "1";
arguments[6] = "--topic";
arguments[7] = "test-topic-Biks";


NB: You should add the maven dependencies for jopt-simple-4.5 & zkclient-0.1

5楼-- · 2019-01-03 23:27

From Kafka 0.8 Producer Example the sample below will create a topic named page_visits and also start producing if the auto.create.topics.enable attribute is set to true (default) in the Kafka Broker config file

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) { 
            long runtime = new Date().getTime();  
            String ip = “192.168.2.” + rnd.nextInt(255); 
            String msg = runtime + “,,” + ip; 
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
6楼-- · 2019-01-03 23:27

From which IDE are your trying ?

Please provide complete path , below are the command from terminal which will create a topic

  1. cd kafka/bin
  2. ./ --topic test --zookeeper localhost:2181
ゆ 、 Hurt°
7楼-- · 2019-01-03 23:29

As of Kafka 0.10.1 the ZKStringSerializer mentioned by Michael is private (for Scala). You can use the factory methods createZkClient or createZkClientAndConnection in ZkUtils.

Scala example for Kafka 0.10.1:

import kafka.utils.ZkUtils

val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(
  "localhost:2181", sessionTimeoutMs, connectionTimeoutMs) 

Then just create the topic as Michael suggested:

import kafka.admin.AdminUtils

val zkUtils = new ZkUtils(zkClient, zkConnection, false)
val numPartitions = 4
val replicationFactor = 1
val topicConfig = new Properties
val topic = "my-topic"
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
登录 后发表回答