关闭卡夫卡连接(Closing a Kafka connection)

2019-10-21 05:52发布

我有一个要发送的邮件数量有限卡夫卡,然后退出应用程序。 出于某种原因,卡夫卡连接熬夜,即使我关闭生产。 我实现(斯卡拉)是多还是少

object Kafka {
  private val props = new Properties()

  props.put("compression.codec", DefaultCompressionCodec.codec.toString)
  props.put("producer.type", "sync")
  props.put("metadata.broker.list", "localhost:9092")
  props.put("batch.num.messages", "200")
  props.put("message.send.max.retries", "3")
  props.put("request.required.acks", "-1")
  props.put("client.id", "myclient")

  private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props))
  private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes)

  def send(msg: Message) = Try(producer.send(encode(msg)))
  def close() = producer.close()
}

这里Message是一个简单的案例类,以及如何我将其转换为字节数组是不是真的有关。

这些消息做到达,但是当我最终调用Kafka.close()应用程序不会退出,并且连接似乎并没有被释放。

有没有一种办法明确要求卡夫卡终止连接?

Answer 1:

DEF的close()= producer.close()

这将创建一个名为“关闭”功能调用producer.close()我看不出有什么证据证明你的代码实际上关闭生产。

您需要只要致电:producer.close



文章来源: Closing a Kafka connection