I have a CSV similar to this:
County City Area Street
county1 city1 area1 street1
county1 city1 area2 street2
county1 city1 area3 street7
county1 city2 area2 street2
county1 city2 area6 street1
county2 city1 area3 street3
county2 city1 area3 street2
...
During the CSV parsing, I need to aggregate the same County/City to create a final structure like this:
county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ]
county1/city2: [ [area2, street2], [area6, street1] ]
county2/city1: [ [area3, street3], [area3, street2] ]
basically a grouping by county/city.
I tried different things with Camel, this is the latest:
class CsvAppender {
CsvRow append(CsvRow existing, CsvRow next) {
next.previous = existing
next
}
}
@CsvRecord(separator = "\\t")
class CsvRow {
@DataField(pos = 1)
private String county
@DataField(pos = 2)
private String city
@DataField(pos = 3)
private String area
@DataField(pos = 4)
private String street
CsvRow previous
boolean sameAggregateWithPrevious() {
previous?.county == county && previous?.city == city
}
public String toString() {
"${county} ${city} ${area} ${street}"
}
}
class CsvRouteBuilder extends RouteBuilder {
void configure() {
CsvAppender appender = new CsvAppender()
Closure predicate = { exchange ->
def body = exchange.getIn().getBody(CsvRow.class)
def currentAggregate = exchange.getIn().getHeader('CurrentAggregate')
def nextAggregate = exchange.getIn().getHeader('NextAggregate')
if (!currentAggregate) {
currentAggregate = body.previous ? [ body.previous ] : []
nextAggregate = []
} else if (exchange.getIn().getHeader('AggregateComplete')) {
currentAggregate = nextAggregate
nextAggregate = []
}
def aggregateComplete = body.sameAggregateWithPrevious()
if (aggregateComplete) {
nextAggregate << body
} else {
currentAggregate << body
}
exchange.getIn().setHeaders(['CurrentAggregate': currentAggregate,
'NextAggregate': nextAggregate,
'AggregateComplete': aggregateComplete])
aggregateComplete
}
from("file:/tmp/folder?noop=true")
.split(body().tokenize('\n')).streaming()
.unmarshal().bindy(BindyType.Csv, CsvRow.class)
.aggregate(constant(true), AggregationStrategies.bean(appender, "append")).completionPredicate(predicate)
.process({
it.getOut().setBody(it.getIn().getHeader('CurrentAggregate')) })
.convertBodyTo(String.class)
.to("jms:myCsvSplitter")
}
}
Anyway my solution doesn't fully work as sometime the "previous" element is null and the code looks too verbose.
Any idea how to aggregate the csv file properly?
I've got some rough code that works that should hopefully be good enough to help you along. It's in Java rather than Groovy, on the grounds that my Groovy isn't up to much. It should be easy enough to translate though.
Firstly the aggregator:
This stores the rows in a list in a map, keyed by the county.
Then the route:
It uses the CamelSplitComplete property to detect when the splitting is finished. In the processpr at the end you can then do what you like with the map. Alternatively you can change the aggregator strategy to aggregate however you need the results.
Hope this helps.