我想产生连续的市场数据的聚合视图,这意味着我们需要计算总和值每2消息。 说来的数据作为:
(V0,T0),(V1,T1),(V2,T2),(V3,T3)....
V
表示值T
装置时间戳当我们收到的数据。
我们需要每2点说,产生的总和:
(R1=Sum(V0,V1),T1),(R2=Sum(V1,V2),T2),(R3=Sum(V2,V3),T3),....
任何建议,我们如何能做到这一点,通过使用aggregator2
否则就要为此编写的处理器?
我想产生连续的市场数据的聚合视图,这意味着我们需要计算总和值每2消息。 说来的数据作为:
(V0,T0),(V1,T1),(V2,T2),(V3,T3)....
V
表示值T
装置时间戳当我们收到的数据。
我们需要每2点说,产生的总和:
(R1=Sum(V0,V1),T1),(R2=Sum(V1,V2),T2),(R3=Sum(V2,V3),T3),....
任何建议,我们如何能做到这一点,通过使用aggregator2
否则就要为此编写的处理器?
你说得对, aggregator2组件是要走的好办法。 我会尝试这样的事情:
from("somewhere").split(body().tokenize("),")).streaming()
.aggregate(new ValueAggregationStrategy()).completionTimeout(1500)
.to("whatYouWant");
class ValueAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(extractValue(oldBody) + extractValue(newBody));
return oldExchange;
}
public int extractValue(String body) {
// Do the work "(V0,T0" -> "V0"
}
}
注意:这将是更容易分析,如果你能有一个这样的格式: V0,T0;V1,T1...
欲了解更多信息: 这里是一篇关于解析大文件,骆驼写道克劳斯易卜生
阅读聚合器的源代码后,事实证明,只有骆驼聚集一个消息给一组,我们必须建立用于此目的的“聚合”。 这里是代码:
public abstract class GroupingGenerator<I> implements Processor {
private final EvictingQueue<I> queue;
private final int size;
public int getSize() {
return size;
}
public GroupingGenerator(int size) {
super();
this.size = size;
this.queue = EvictingQueue.create(size);
}
@SuppressWarnings("unchecked")
@Override
public void process(Exchange exchange) throws Exception {
queue.offer((I) exchange.getIn().getBody());
if (queue.size() != size) {
exchange.setProperty(Exchange.ROUTE_STOP, true);
return;
} else {
processGroup(queue, exchange);
}
}
protected abstract void processGroup(Collection<I> items, Exchange exchange);
}