In my case I need multiple topics with each topic being linked with multiple consumers. I want to set a consumer group for each topic. I did not find any method in kafka
.net client so that I can create consumer group dynamically and link the topic with that consumer group. I am using kafka 0.9.0
version, please tell me if I need to change to kafka server
setting or on Zookeeper
?
问题:
回答1:
I am not sure what you mean by "set a consumer group for each topic". If you start a new consumer group (ie, start a consumer with the corresponding group-ID), the consumer group decides which topic(s) it wants to consumer (by just subscribing to the topic). The is no special configuration for it and thus, a group is always dynamically created on topic subscription.
Update (.net client):
I am not familiar with .net client. However, according to the Github page (github.com/Jroland/kafka-net) it seems, consumer groups are not supported yet.
However, it seems you can use whitelist to read only certain partitions. Thus, you can distribute the load manually:
From https://github.com/Jroland/kafka-net#consumer-1:
If no whitelist is provided then all partitions will be consumed creating one KafkaConnection for each partition leader
回答2:
I'm built a quick prototype with Microsoft .NET kafka as link below. not sure it's solving your problem or not.
However, I'm hightly recommend you to use this library because it contain much more feature than kafka-net(e.g. supports zookeeper for maintaining offset, topic group, etc.)
https://github.com/Microsoft/CSharpClient-for-Kafka
Sample Code
This will send 10 message to kafka and output message to console when consumer got it.
static void Main(string[] args)
{
Task.Factory.StartNew(() =>
{
ConsumerConfiguration consumerConfig = new ConsumerConfiguration
{
AutoCommit = true,
AutoCommitInterval = 1000,
GroupId = "group1",
ConsumerId = "1",
AutoOffsetReset = OffsetRequest.SmallestTime,
NumberOfTries = 20,
ZooKeeper = new ZooKeeperConfiguration("localhost:2181", 30000, 30000, 2000)
};
var consumer = new ZookeeperConsumerConnector(consumerConfig, true);
var dictionaryMapping = new Dictionary<string, int>();
dictionaryMapping.Add("topic1", 1);
var streams = consumer.CreateMessageStreams(dictionaryMapping, new DefaultDecoder());
var messageStream = streams["topic1"][0];
foreach (var message in messageStream.GetCancellable(new CancellationToken()))
{
Console.WriteLine("Response: P{0},O{1} : {2}", message.PartitionId, message.Offset, Encoding.UTF8.GetString(message.Payload));
//If you set AutoCommit to false, you can commit by yourself from this command.
//consumer.CommitOffsets()
}
});
var brokerConfig = new BrokerConfiguration()
{
BrokerId = 1,
Host = "localhost",
Port = 9092
};
var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
config.CompressionCodec = CompressionCodecs.DefaultCompressionCodec;
config.ProducerRetries = 3;
config.RequiredAcks = -1;
var kafkaProducer = new Producer(config);
byte[] payloadData = Encoding.UTF8.GetBytes("Test Message");
var inputMessage = new Message(payloadData);
var data = new ProducerData<string, Message>("topic1", inputMessage);
for (int i = 0; i < 10; i++)
{
kafkaProducer.Send(data);
}
Console.ReadLine();
}
Hope this help.