How to ensure storm not write message twice to loc

2019-08-07 15:02发布

I build a topo to get messages from by kafka, and then grep some keyword, if fit, write to local file.

I use OpaqueTridentKafkaSpout of storm-kafka to ensure the tuple will not miss or repeat, but consider one situation: when writing message to local file, and some error occur (for example, not enough space). At this moment, some messages have written to local file, and others not, if the spout resend the message, the message will be write twice.

How to handle that?

2条回答
该账号已被封号
2楼-- · 2019-08-07 15:38

You must design your anchoring strategy for this purpose. I suggest that you can decrease batch size from kafkaspoutconfig and stores your selected messages in list. When all messages in batch is processed, you can write list content to local file .

As you know, Trident processes stream in batch, if your system throw any error while processing any of tuple in stream, all batch will be discarded.

In your case, you can surround by try catch your code block which is responsible for write to local file, and in catch block you must throw backtype.storm.topology.ReportedFailedException. By this way, you can ensure exactly one semantic.

Also you must use transactional kafka spout to ensure exactly one semantic.

查看更多
Luminary・发光体
3楼-- · 2019-08-07 15:55

It's simple. The code that writes to the file needs to do the following:

1) Ack the tuple - Only if the write to a file is successful. 2) Fail the tuple - If the write to a file was NOT successful.

For all tuples that were ack'd, Kafka spout will NOT resend them. Failed tuples will be reset by the spout.

查看更多
登录 后发表回答