I have a CSV similar to this:
County City Area Street
county1 city1 area1 street1
county1 city1 area2 street2
county1 city1 area3 street7
county1 city2 area2 street2
county1 city2 area6 street1
county2 city1 area3 street3
county2 city1 area3 street2
...
During the CSV parsing, I need to aggregate the same County/City to create a final structure like this:
county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ]
county1/city2: [ [area2, street2], [area6, street1] ]
county2/city1: [ [area3, street3], [area3, street2] ]
basically a grouping by county/city.
I tried different things with Camel, this is the latest:
class CsvAppender {
CsvRow append(CsvRow existing, CsvRow next) {
next.previous = existing
next
}
}
@CsvRecord(separator = "\\t")
class CsvRow {
@DataField(pos = 1)
private String county
@DataField(pos = 2)
private String city
@DataField(pos = 3)
private String area
@DataField(pos = 4)
private String street
CsvRow previous
boolean sameAggregateWithPrevious() {
previous?.county == county && previous?.city == city
}
public String toString() {
"${county} ${city} ${area} ${street}"
}
}
class CsvRouteBuilder extends RouteBuilder {
void configure() {
CsvAppender appender = new CsvAppender()
Closure predicate = { exchange ->
def body = exchange.getIn().getBody(CsvRow.class)
def currentAggregate = exchange.getIn().getHeader('CurrentAggregate')
def nextAggregate = exchange.getIn().getHeader('NextAggregate')
if (!currentAggregate) {
currentAggregate = body.previous ? [ body.previous ] : []
nextAggregate = []
} else if (exchange.getIn().getHeader('AggregateComplete')) {
currentAggregate = nextAggregate
nextAggregate = []
}
def aggregateComplete = body.sameAggregateWithPrevious()
if (aggregateComplete) {
nextAggregate << body
} else {
currentAggregate << body
}
exchange.getIn().setHeaders(['CurrentAggregate': currentAggregate,
'NextAggregate': nextAggregate,
'AggregateComplete': aggregateComplete])
aggregateComplete
}
from("file:/tmp/folder?noop=true")
.split(body().tokenize('\n')).streaming()
.unmarshal().bindy(BindyType.Csv, CsvRow.class)
.aggregate(constant(true), AggregationStrategies.bean(appender, "append")).completionPredicate(predicate)
.process({
it.getOut().setBody(it.getIn().getHeader('CurrentAggregate')) })
.convertBodyTo(String.class)
.to("jms:myCsvSplitter")
}
}
Anyway my solution doesn't fully work as sometime the "previous" element is null and the code looks too verbose.
Any idea how to aggregate the csv file properly?