弗林克+卡夫卡,java.lang.OutOfMemoryError当并行> 1(Flink

2019-09-27 23:53发布

我有一个玩具弗林克工作从3个卡夫卡主题读取,然后联合所有这3个数据流。 这一切,没有额外的工作。

如果使用并行1我弗林克的工作,似乎一切都很好,因为SOOS我改变并行> 1时,出现:

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:693)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)

为什么它的工作原理与并行1,但不具有并行> 1?

这是否与卡夫卡服务器端设置? 或者它(但在我的代码没有什么特别的配置)相关comsumer设置在我的Java代码?

我知道这里proviced的信息可能是不够的,但我不能触摸到卡夫卡集​​群。 我只是希望有些大师可能发生之前遇到同样的错误,并能与我分享一些建议。

我使用的是卡夫卡0.10,弗林克1.5。

非常感谢。

Answer 1:

正如你可以在错误日志中看到此错误是从卡夫卡集群。 当卡夫卡经纪人的直接缓冲内存超过分配给JVM 堆大小会发生此问题。 所要求的应用程序的直接缓冲存储器由JVM的堆中分配。 当使用并行> 1时,多个弗林克任务, 分钟(弗林克槽孔,卡夫卡分区数的次数)将在同一时间消耗来自卡夫卡数据,导致更多的使用卡夫卡经纪人相比堆大小时并行等于一并会发生所谓的错误。 的标准溶液是通过将可变KAFKA_HEAP_OPTS卡夫卡ENV文件或作为OS环境变量 ,以增加可用的堆大小以卡夫卡经纪人。 例如,添加以下行到堆大小设置为2 GB:

export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"

但是,在你的情况这有卡夫卡经纪人无权访问(根据你的问题),可以降低单个调用返回的记录的数量来轮询(),所以需要在经纪人堆内存将减少。 (这不是一个标准溶液,我建议刚消失的错误)。

从这样的回答 :

卡夫卡消费者通过以下两个参数处理数据积压,

max.poll.interval.ms
一项民意调查的调用之间的最大延迟(使用消费群管理时)。 这地方的上界,消费者可以获取更多的记录之前空闲的时间量。 如果民意调查()这个超时到期之前不叫,那么消费者被认为是失败,该集团将为了重新分配分区到另一个成员重新平衡。 默认值为30万。

max.poll.records
最大记录数在一个单一的调用返回的查询()。 默认值是500。

忽略按要求可能会导致,消费者可能无法利用现有资源来处理最大的数据轮询设置上述两个参数,导致内存不足或未能提交消费者在时间偏移。 因此,它始终是最好使用max.poll.records和max.poll.interval.ms参数。

所以对于一个测试,max.poll.records的价值降低到例如250,并检查是否会发生错误,但。

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);


文章来源: Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1