我使用的动物园管理员摆脱卡夫卡的数据。 在这里,我总是从去年偏移点的数据。 有什么方法来指定的偏移变老数据的时间?
有一个选项autooffset.reset。 它接受的最小或最大。 可有人请解释什么是最小和最大。 可以autooffset.reset在获得来自旧数据帮助抵消点,而不是最新的偏移点?
我使用的动物园管理员摆脱卡夫卡的数据。 在这里,我总是从去年偏移点的数据。 有什么方法来指定的偏移变老数据的时间?
有一个选项autooffset.reset。 它接受的最小或最大。 可有人请解释什么是最小和最大。 可以autooffset.reset在获得来自旧数据帮助抵消点,而不是最新的偏移点?
消费者总是属于一个组,每个分区时,动物园管理员保持跟踪消费群的分区中的进展情况。
从一开始获取数据,可以删除与进程相关联的侯赛因所指向的所有数据
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
您还可以指定偏移你想要的分区,如在核心/ src目录/主/斯卡拉/卡夫卡/工具/ UpdateOffsetsInZK.scala规定
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
但是偏移不是时间索引,但你知道每个分区是一个序列。
如果您的信息包含时间戳(和提防这个时间戳无关卡夫卡收到你的消息的那一刻),你可以尝试这样做,试图通过增加N个偏移量来检索步骤一个条目的索引,并存储元组(主题X,部分2,偏移100,时间戳)的某个地方。
当你想检索在指定时刻的条目,你可以申请一个二进制搜索到你粗糙的索引,直到你找到你想要的条目,并从那里获取。
从卡夫卡的文件 ,他们说:“kafka.api.OffsetRequest.EarliestTime()查找日志中的数据的开始,并开始从那里流,kafka.api.OffsetRequest.LatestTime()将只流新的消息。不要以为该偏移0开始偏移,因为信息时代了日志随着时间的推移“。
这里使用SimpleConsumerExample: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
类似的问题: 卡夫卡高级用户获取所有邮件主题中使用Java API(相当于--from-开始)
这可能帮助
请参阅有关卡夫卡配置的文档: http://kafka.apache.org/08/configuration.html有关偏置参数的最小值和最大值查询。
顺便说一句,在探索卡夫卡,我不知道如何来重播所有消息的消费者。 我的意思是,如果一个消费群体已调查所有的消息和它想重新得到这些。
可以实现的方式是从动物园管理员删除数据。 使用kafka.utils.ZkUtils类删除饲养员的一个节点。 下面是它的用法:
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
目前
卡夫卡常见问题给予回答这个问题。
如何准确地获取消息的偏移量使用OffsetRequest一定时间戳?
卡夫卡允许通过时间查询消息的偏移量,并且它在段粒度这样做。 时间戳参数是Unix时间戳和查询通过时间戳偏移返回一个不晚于在给定时间戳附加消息的偏移最新的可能。 有时间戳的2个特殊值 - 最新和最早的。 对于UNIX时间戳的任何其他值,卡夫卡将获得首发时创建不迟于给定时间戳日志段的偏移量。 由此,并且由于偏移请求仅在段粒度服务,偏移取请求返回对于较大的段大小较不准确的结果。
为了更准确的结果,您可以配置,而不是大小(log.segment.bytes)基于时间(log.roll.ms)日志段大小。 但是应该小心,因为这样做可能会增加文件句柄的数量由于频繁的日志段滚动服用。
未来计划
卡夫卡将添加时间戳消息格式。 请参阅
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
卡夫卡协议文件是一个伟大的来源与请求/响应/偏移/信息发挥: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol你用简单的消费例如为其中下面的代码演示状态:
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = simpleConsumer.fetch(req);
设置readOffset以开始从初始偏移。 但你需要检查最大偏移以及上面会提供有限的偏移量addFetch方法的最后PARAM算作每FETCHSIZE。
使用KafkaConsumer可以使用求,SeekToBeginning和SeekToEnd到流中来回移动。
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)
此外,如果没有提供分区,将寻求所有当前指定分区的第一偏移。
你尝试过吗?
斌/ kafka-console-consumer.sh --bootstrap-服务器localhost:9092 --topic测试--from-开始
这将打印出所有在这个例子中,给定的主题,“测试”的消息。
从这个链接的更多细节https://kafka.apache.org/quickstart