I build a topo to get messages from by kafka, and then grep some keyword, if fit, write to local file.
I use OpaqueTridentKafkaSpout
of storm-kafka to ensure the tuple will not miss or repeat, but consider one situation: when writing message to local file, and some error occur (for example, not enough space). At this moment, some messages have written to local file, and others not, if the spout resend the message, the message will be write twice.
How to handle that?
You must design your anchoring strategy for this purpose. I suggest that you can decrease batch size from kafkaspoutconfig and stores your selected messages in list. When all messages in batch is processed, you can write list content to local file .
As you know, Trident processes stream in batch, if your system throw any error while processing any of tuple in stream, all batch will be discarded.
In your case, you can surround by try catch your code block which is responsible for write to local file, and in catch block you must throw backtype.storm.topology.ReportedFailedException. By this way, you can ensure exactly one semantic.
Also you must use transactional kafka spout to ensure exactly one semantic.
It's simple. The code that writes to the file needs to do the following:
1) Ack the tuple - Only if the write to a file is successful. 2) Fail the tuple - If the write to a file was NOT successful.
For all tuples that were ack'd, Kafka spout will NOT resend them. Failed tuples will be reset by the spout.