Read CSV file concurrently using Spring Integratio

2019-03-02 02:26发布

I would like to process CSV file concurrently using spring integration. Each row will be converted into individual message. So Assuming I am having 10K rows in CSV file , I would like to start 10 Thread , each row will be pass to this Thread. it would be great if anyone show me any sample example.

Thanks

2条回答
Anthone
2楼-- · 2019-03-02 02:46

I'm using Spring Integration 4.1.0 and have tried your suggestion however it doesn't seem to be working for me. I've gotten to look into this a bit today and now lean toward it being a Spring Integration 4.1.0 bug.

See if what I explain makes sense.

If you try this example, you'll see it will work (note this is NOT using your SpEL example):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:inbound-channel-adapter id="exchangeReplayFileAdapter" ref="exchangeReplayFileReadingMessageSource" method="receive" auto-startup="true" channel="channel1">
        <int:poller fixed-delay="10000000" />
    </int:inbound-channel-adapter>

    <bean id="exchangeReplayFileReadingMessageSource" class="org.springframework.integration.file.FileReadingMessageSource">
        <property name="directory" value="/tmp/inputdir" />
    </bean>

    <int:channel id="channel1">
        <int:dispatcher task-executor="taskExecutor" />
    </int:channel>

    <int:splitter input-channel="channel1" output-channel="channel2">
        <bean class="com.xxx.common.util.springintegration.FileSplitter" />
    </int:splitter>

    <int:channel id="channel2"></int:channel>
    <int-stream:stdout-channel-adapter channel="channel2"></int-stream:stdout-channel-adapter>

    <task:executor id="taskExecutor" pool-size="1" />
</beans>

With this Splitter implementation...

package com.xxx.common.util.springintegration;

import java.io.File;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitterNew.class);

    public Object splitMessage(Message<?> message) {
        if (log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
            return org.apache.commons.io.FileUtils.lineIterator((File) payload);
        } catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

}

Using your SpEL example:

<int:splitter input-channel="exchangeReplayFiles" output-channel="exchangeSpringQueueChannel"  
    expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>

what internally gets created by the parser is this (notice the List.class type passed into the ExpressionEvaluatingMessageProcessor constructor:

/**
 * A Message Splitter implementation that evaluates the specified SpEL
 * expression. The result of evaluation will typically be a Collection or
 * Array. If the result is not a Collection or Array, then the single Object
 * will be returned as the payload of a single reply Message.
 *
 * @author Mark Fisher
 * @author Gary Russell
 * @since 2.0
 */
public class ExpressionEvaluatingSplitter extends AbstractMessageProcessingSplitter {

    @SuppressWarnings({"unchecked", "rawtypes"})
    public ExpressionEvaluatingSplitter(Expression expression) {
        super(new ExpressionEvaluatingMessageProcessor(expression, List.class));
    }

}

And the ExpressionEvaluatingMessageProcessor class:

/**
 * A {@link MessageProcessor} implementation that evaluates a SpEL expression
 * with the Message itself as the root object within the evaluation context.
 *
 * @author Mark Fisher
 * @author Artem Bilan
 * @since 2.0
 */
public class ExpressionEvaluatingMessageProcessor<T> extends AbstractMessageProcessor<T> {

    private final Expression expression;

    private final Class<T> expectedType;


  ...
    /**
     * Create an {@link ExpressionEvaluatingMessageProcessor} for the given expression
     * and expected type for its evaluation result.
     * @param expression The expression.
     * @param expectedType The expected type.
     */
    public ExpressionEvaluatingMessageProcessor(Expression expression, Class<T> expectedType) {
        Assert.notNull(expression, "The expression must not be null");
        try {
            this.expression = expression;
            this.expectedType = expectedType;
        }
        catch (ParseException e) {
            throw new IllegalArgumentException("Failed to parse expression.", e);
        }
    }

    /**
     * Processes the Message by evaluating the expression with that Message as the
     * root object. The expression evaluation result Object will be returned.
     * @param message The message.
     * @return The result of processing the message.
     */
    @Override
    public T processMessage(Message<?> message) {
        return this.evaluateExpression(this.expression, message, this.expectedType);
    }
...

}

What gets returned from the sample provided ends up being an ArrayList (which implements the Collection interface) containing a single LineIterator element.

The ExpressionEvaluatingSplitter is a subclass of AbstractMessageSplitter and it does NOT override the handleRequestMessage(Message<?> message) method.
That method looks like this:

public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageHandler {
    protected final Object handleRequestMessage(Message<?> message) {
        Object result = this.splitMessage(message);
        // return null if 'null'
        if (result == null) {
            return null;
        }

        Iterator<Object> iterator;
        final int sequenceSize;
        if (result instanceof Collection) {
            Collection<Object> items = (Collection<Object>) result;
            sequenceSize = items.size();
            iterator = items.iterator();
        }
        else if (result.getClass().isArray()) {
            Object[] items = (Object[]) result;
            sequenceSize = items.length;
            iterator = Arrays.asList(items).iterator();
        }
        else if (result instanceof Iterable<?>) {
            sequenceSize = 0;
            iterator = ((Iterable<Object>) result).iterator();
        }
        else if (result instanceof Iterator<?>) {
            sequenceSize = 0;
            iterator = (Iterator<Object>) result;
        }
        else {
            sequenceSize = 1;
            iterator = Collections.singleton(result).iterator();
        }

        if (!iterator.hasNext()) {
            return null;
        }

        final MessageHeaders headers = message.getHeaders();
        final Object correlationId = headers.getId();
        final AtomicInteger sequenceNumber = new AtomicInteger(1);

        return new FunctionIterator<Object, AbstractIntegrationMessageBuilder<?>>(iterator,
                new Function<Object, AbstractIntegrationMessageBuilder<?>>() {
                    @Override
                    public AbstractIntegrationMessageBuilder<?> apply(Object object) {
                        return createBuilder(object, headers, correlationId, sequenceNumber.getAndIncrement(),
                                sequenceSize);
                    }
                });
    }

Since ArrayList is indeed a Collection, it never gets to the logic where it sets the iterator, and therefore never calls the next() on the iterator in the produceOutput(...) method.

So why is LineIterator cohearsed into an ArrayList? I believe there is a flaw in the ExpressionEvaluatingSplitter in that it always does this:

public ExpressionEvaluatingSplitter(Expression expression) {
    super(new ExpressionEvaluatingMessageProcessor(expression, List.class));
}

I think in Spring Integration 4 it should now look at the type the expression evaluates to (either a List or an Iterator) then call the super (might need to rework how that is done since deciding type would be done before calling the super which JVM won't allow).

What do you think?

查看更多
倾城 Initia
3楼-- · 2019-03-02 02:48

Starting with Spring Integration 4.0 the <splitter> supports Iterator as payload to split. Hence you can convert inbound File to the LineIterator and process messages for each line in paralle, if an output-channel of <splitter> is ExecutorChannel:

<splitter input-channel="splitChannel" output-channel="executorChannel"
          expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>
查看更多
登录 后发表回答