I am working on C++ Kafka client: librdkafka. The lib is here https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp. My program is writing 2000000 messages to the broker. During this process, I restarted the broker. Sometimes, no messages failed to be delivered to broker. Some times about 100,000 messages failed to be delivered to broker. queue.buffering.max.messages=100000. It seems that all the messages in the out queue were lost? The error is RdKafka::Message delivery report: Local: Unknown partition.
I found new problems:(1) sometimes, about 200 messages are sent to broker twice.(2) Sometimes, a message was sent to broker already, but the dr_cb() is called. It told me that this message failed to be delivered to broker. I am trying to figure out whether it is the problem of broker or the client. Anyone has similar problems? In fact, I need reliable transmission and delivery reports between client and broker server. I am considering using C client now. Not sure whether this problem will happens again...
The log of broker is:
[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-07-21 17:48:33,717] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-07-21 17:48:33,718] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 5017; ClientId: rdkafka; Topics: test (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.foreach(Set.scala:86) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.immutable.Set$Set1.map(Set.scala:73) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542) at kafka.server.KafkaApis.handle(KafkaApis.scala:62) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:745)
[2015-07-21 17:48:33,743] INFO Registered broker 0 at path /brokers/ids/0 with address cyclops-9803:9092. (kafka.utils.ZkUtils$)
[2015-07-21 17:48:33,759] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-07-21 17:48:33,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2015-07-21 17:48:33,858] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,000] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2015-07-21 17:48:34,017] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
My producer configuration is:
Global config
client.id=rdkafka
metadata.broker.list=localhost:9092
message.max.bytes=4000000
receive.message.max.bytes=100000000
metadata.request.timeout.ms=900000
topic.metadata.refresh.interval.ms=-1
topic.metadata.refresh.fast.cnt=10
topic.metadata.refresh.fast.interval.ms=250
topic.metadata.refresh.sparse=false
socket.timeout.ms=300000
socket.send.buffer.bytes=0
socket.receive.buffer.bytes=0
socket.keepalive.enable=false
socket.max.fails=10
broker.address.ttl=300000
broker.address.family=any
statistics.interval.ms=0
error_cb=0x5288a60
stats_cb=0x5288ba0
log_cb=0x54942a0
log_level=6
socket_cb=0x549e6c0
open_cb=0x54acf90
opaque=0x9167898
internal.termination.signal=0
queued.min.messages=100000
queued.max.messages.kbytes=1000000
fetch.wait.max.ms=100
fetch.message.max.bytes=1048576
fetch.min.bytes=1
fetch.error.backoff.ms=500
queue.buffering.max.messages=100000
queue.buffering.max.ms=1000
message.send.max.retries=10
retry.backoff.ms=100
compression.codec=none
batch.num.messages=1000
delivery.report.only.error=true
Topic config
request.required.acks=1
enforce.isr.cnt=0
request.timeout.ms=5000
message.timeout.ms=300000
produce.offset.report=false
auto.commit.enable=true
auto.commit.interval.ms=60000
auto.offset.reset=largest
offset.store.path=.
offset.store.sync.interval.ms=-1
offset.store.method=file
consume.callback.max.messages=0
The consumer output is:
[2015-07-22 20:57:21,052] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
Any suggestions are welcome. Thanks.
In the asyn mode, the client should handle this kind of problem. No idea how to make sure the messages in the out queue can be delivered to broker with 100% probability. What we can do is to make sure the message in the out queue. If failed to delivery, we should put the message into the queue again. If failed to delivery, dr_cb() is called. In this function, try to put the message into the out queue again. Maybe this is not the best way. But now, I am using this way.