How to aggregate CSV lines with Apache Camel?

2019-09-06 14:01发布

I have a CSV similar to this:

County  City  Area  Street
county1 city1 area1 street1
county1 city1 area2 street2
county1 city1 area3 street7
county1 city2 area2 street2
county1 city2 area6 street1
county2 city1 area3 street3
county2 city1 area3 street2
...

During the CSV parsing, I need to aggregate the same County/City to create a final structure like this:

county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ]
county1/city2: [ [area2, street2], [area6, street1] ]
county2/city1: [ [area3, street3], [area3, street2] ]

basically a grouping by county/city.

I tried different things with Camel, this is the latest:

class CsvAppender {
    CsvRow append(CsvRow existing, CsvRow next) {
        next.previous = existing
        next
    }
}

@CsvRecord(separator = "\\t")
class CsvRow {
    @DataField(pos = 1)
    private String county

    @DataField(pos = 2)
    private String city

    @DataField(pos = 3)
    private String area

    @DataField(pos = 4)
    private String street

    CsvRow previous

    boolean sameAggregateWithPrevious() {
        previous?.county == county && previous?.city == city
    }

    public String toString() {
        "${county} ${city} ${area} ${street}"
    }
}

class CsvRouteBuilder extends RouteBuilder {

    void configure() {
        CsvAppender appender = new CsvAppender()

        Closure predicate = { exchange ->
            def body = exchange.getIn().getBody(CsvRow.class)
            def currentAggregate = exchange.getIn().getHeader('CurrentAggregate')
            def nextAggregate = exchange.getIn().getHeader('NextAggregate')

            if (!currentAggregate) {
                currentAggregate = body.previous ? [ body.previous ] : []
                nextAggregate = []
            } else if (exchange.getIn().getHeader('AggregateComplete')) {
                currentAggregate = nextAggregate
                nextAggregate = []
            }

            def aggregateComplete = body.sameAggregateWithPrevious()
            if (aggregateComplete) {
                nextAggregate << body
            } else {
                currentAggregate << body
            }
            exchange.getIn().setHeaders(['CurrentAggregate': currentAggregate,
                                         'NextAggregate': nextAggregate,
                                         'AggregateComplete': aggregateComplete])
            aggregateComplete
        }

        from("file:/tmp/folder?noop=true")
            .split(body().tokenize('\n')).streaming()
            .unmarshal().bindy(BindyType.Csv, CsvRow.class)
                .aggregate(constant(true), AggregationStrategies.bean(appender, "append")).completionPredicate(predicate)
                .process({
                    it.getOut().setBody(it.getIn().getHeader('CurrentAggregate')) })
                .convertBodyTo(String.class)
            .to("jms:myCsvSplitter")
    }
}

Anyway my solution doesn't fully work as sometime the "previous" element is null and the code looks too verbose.

Any idea how to aggregate the csv file properly?

1条回答
做个烂人
2楼-- · 2019-09-06 14:21

I've got some rough code that works that should hopefully be good enough to help you along. It's in Java rather than Groovy, on the grounds that my Groovy isn't up to much. It should be easy enough to translate though.

Firstly the aggregator:

public class MyAgregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        CsvRow newBody = (CsvRow)newExchange.getIn().getBody();
        Map<String, List<CsvRow>> map = null;
        if (oldExchange == null) {
            map = new HashMap<String, List<CsvRow>>();
            ArrayList list = new ArrayList<CsvRow>();
            list.add(newBody);
            map.put(newBody.getCounty(), list);
            newExchange.getIn().setBody(map);
            return newExchange;
        } else {
            map = oldExchange.getIn().getBody(Map.class);
            List list = map.get(newBody.getCounty());
            if ( list == null ) {
                list = new ArrayList<CsvRow>();
            }
            list.add(newBody);
            map.put(newBody.getCounty(), list);

            oldExchange.setProperty("CamelSplitComplete", newExchange.getProperty("CamelSplitComplete"));
            return oldExchange;
        }
    }
}

This stores the rows in a list in a map, keyed by the county.

Then the route:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("file:/c:/dev/test?noop=true")
        .split(body().tokenize("\n"))
        .log("Read line ${body}")
        .unmarshal()
        .bindy(BindyType.Csv, CsvRow.class)
            .aggregate(constant(true), new MyAgregationStrategy()).completionPredicate(simple("${property.CamelSplitComplete} == true"))
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                Map results = (Map) exchange.getIn().getBody();
                System.out.println("Got results for " + results.size() + " counties");
            }
        });
    }
}

It uses the CamelSplitComplete property to detect when the splitting is finished. In the processpr at the end you can then do what you like with the map. Alternatively you can change the aggregator strategy to aggregate however you need the results.

Hope this helps.

查看更多
登录 后发表回答