I am able to use standard spout,bolt combination to do streaming aggregation
and works very well in happy case, when using tick tuples to persist data at some interval
to make use of batching. Right now i am doing some failure management (tracking off tuples not saved etc) myself.(i.e not ootb from storm)
But i have read that trident gives you a higher abstraction and better failure management.
What i dont understand is whether there is tick tuple support in trident. Basically
I would like to batch in memory for the current minute or so and persist any aggregated data
for the previous minutes using trident.
Any pointers here or design suggestions would be helpful.
Thanks
Actually micro-batching is a built-in Trident's feature. You don't need any tick tuples for that. When you have something like this in your code:
topology
.newStream("myStream", spout)
.partitionPersist(
ElasticSearchEventState.getFactoryFor(connectionProvider),
new Fields("field1", "field2"),
new ElasticSearchEventUpdater()
)
(I'm using here my custom ElasticSearch state/updater, you might use something else)
So when you have something like this, under the hood Trident group your stream into batches and performs partitionPersist operation not on individual tuples but on those batches.
If you still need tick tuples for any reason, just create your tick spout, something like this works for me:
public class TickSpout implements IBatchSpout {
public static final String TIMESTAMP_FIELD = "timestamp";
private final long delay;
public TickSpout(long delay) {
this.delay = delay;
}
@Override
public void open(Map conf, TopologyContext context) {
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
Utils.sleep(delay);
collector.emit(new Values(System.currentTimeMillis()));
}
@Override
public void ack(long batchId) {
}
@Override
public void close() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(TIMESTAMP_FIELD);
}
}