Using tick tuples with trident in storm

2019-06-27 19:44发布

I am able to use standard spout,bolt combination to do streaming aggregation and works very well in happy case, when using tick tuples to persist data at some interval to make use of batching. Right now i am doing some failure management (tracking off tuples not saved etc) myself.(i.e not ootb from storm)

But i have read that trident gives you a higher abstraction and better failure management. What i dont understand is whether there is tick tuple support in trident. Basically I would like to batch in memory for the current minute or so and persist any aggregated data for the previous minutes using trident.

Any pointers here or design suggestions would be helpful.

Thanks

1条回答
仙女界的扛把子
2楼-- · 2019-06-27 20:01

Actually micro-batching is a built-in Trident's feature. You don't need any tick tuples for that. When you have something like this in your code:

topology
    .newStream("myStream", spout)
    .partitionPersist(
        ElasticSearchEventState.getFactoryFor(connectionProvider),
        new Fields("field1", "field2"),
        new ElasticSearchEventUpdater()
    )

(I'm using here my custom ElasticSearch state/updater, you might use something else)

So when you have something like this, under the hood Trident group your stream into batches and performs partitionPersist operation not on individual tuples but on those batches.

If you still need tick tuples for any reason, just create your tick spout, something like this works for me:

public class TickSpout implements IBatchSpout {

    public static final String TIMESTAMP_FIELD = "timestamp";
    private final long delay;

    public TickSpout(long delay) {
        this.delay = delay;
    }

    @Override
    public void open(Map conf, TopologyContext context) {
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        Utils.sleep(delay);
        collector.emit(new Values(System.currentTimeMillis()));
    }

    @Override
    public void ack(long batchId) {
    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(TIMESTAMP_FIELD);
    }
}
查看更多
登录 后发表回答