我建立一个地形从卡夫卡得到的消息,然后用grep一些关键字,如果合适,写入本地文件。
我用OpaqueTridentKafkaSpout
风暴卡夫卡,以确保元组不会错过或重复,而是考虑一个情况:写作时消息的本地文件,有的发生错误(例如,没有足够的空间)。 这时,一些消息已经写入到本地文件,和其他人没有,如果喷口重新发送邮件,邮件将被写两次。
如何处理这个问题?
我建立一个地形从卡夫卡得到的消息,然后用grep一些关键字,如果合适,写入本地文件。
我用OpaqueTridentKafkaSpout
风暴卡夫卡,以确保元组不会错过或重复,而是考虑一个情况:写作时消息的本地文件,有的发生错误(例如,没有足够的空间)。 这时,一些消息已经写入到本地文件,和其他人没有,如果喷口重新发送邮件,邮件将被写两次。
如何处理这个问题?
这很简单。 一个写入文件中的代码需要做到以下几点:
1)确认的元组 - 仅当写入到一个文件是成功的。 2)失败的元组 - 如果写入到一个文件中没有成功。
对于已ack'd所有元组,卡夫卡口将不会重新发送它们。 失败元组将通过管口复位。
您必须设计锚固策略用于这一目的。 我建议你可以从kafkaspoutconfig并存储列表减少批量大小所选的消息。 在批处理的所有邮件进行处理,你可以写列表内容到本地文件。
如你所知,在三叉戟批量处理数据流,如果你的系统抛出任何错误,而处理任何流元组,所有批次都将被丢弃。
你的情况,你可以通过环绕尝试抓住你的代码块负责写入到本地文件,并在catch块,你必须抛出backtype.storm.topology.ReportedFailedException。 通过这种方式,可以确保只有一个语义。
你还必须使用事务卡夫卡嘴,以确保只有一个语义。