I'm testing kafka a little, and hopefully am going to put it in my production stack soon.
I'm using the files kafka-console-producer.sh
and kafka-console-consumer.sh
to test kafka's functionality.
I created a topic with 2 partitions, but I see that all the messages my producer is sending are going to one partition.
I searched this on the internet and read that there is a setting called topic.metadata.refresh.interval.ms
that tells the producer to change the partition it's currently writing to, and that the default is 10 min.
I'm trying to change this setting, but I don't understand where... ?
Kafka properties can be found in the following three files
server.properties, producer.properties, consumer.properties
These files will be available in the folder kafka-folder/config/
. By default some properties will be available in those file. You can add what ever the properties you want. The list of properties is given in this link https://kafka.apache.org/08/configuration.html. This link contains detail explanation of all the properties for server, producer and consumer.
Since topic.metadata.refresh.interval.ms
is a producer configuration, you need to add this property in the producer.properties
file
Note the above link is the property configurations for the kafka-0.8 version.
You can also set it up inside a producer class:
Properties props = new Properties();
props.put("topic.metadata.refresh.interval.ms", "10");
ProducerConfig config = new ProducerConfig(props);
I understand that you are referring to the Java client and Jaya's answer is sufficient. In case you are playing with the configuration of client built on top of librdkafka
then the configuration can be slightly different:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
metadata.max.age.ms
:
Metadata cache max age. Defaults to metadata.refresh.interval.ms * 3. Type: integer
topic.metadata.refresh.interval.ms
:
Topic metadata refresh interval in milliseconds. The metadata is automatically refreshed on error and connect. Use -1 to disable the intervalled refresh. Type: integer