I would like to process CSV file concurrently using spring integration. Each row will be converted into individual message. So Assuming I am having 10K rows in CSV file , I would like to start 10 Thread , each row will be pass to this Thread. it would be great if anyone show me any sample example.
Thanks
I'm using Spring Integration 4.1.0 and have tried your suggestion however it doesn't seem to be working for me. I've gotten to look into this a bit today and now lean toward it being a Spring Integration 4.1.0 bug.
See if what I explain makes sense.
If you try this example, you'll see it will work (note this is NOT using your SpEL example):
With this
Splitter
implementation...Using your SpEL example:
what internally gets created by the parser is this (notice the
List.class
type passed into theExpressionEvaluatingMessageProcessor
constructor:And the
ExpressionEvaluatingMessageProcessor
class:What gets returned from the sample provided ends up being an
ArrayList
(which implements theCollection
interface) containing a singleLineIterator
element.The
ExpressionEvaluatingSplitter
is a subclass ofAbstractMessageSplitter
and it does NOT override thehandleRequestMessage(Message<?> message)
method.That method looks like this:
Since
ArrayList
is indeed aCollection
, it never gets to the logic where it sets the iterator, and therefore never calls thenext()
on the iterator in theproduceOutput(...)
method.So why is
LineIterator
cohearsed into anArrayList
? I believe there is a flaw in theExpressionEvaluatingSplitter
in that it always does this:I think in Spring Integration 4 it should now look at the type the expression evaluates to (either a
List
or anIterator
) then call the super (might need to rework how that is done since deciding type would be done before calling the super which JVM won't allow).What do you think?
Starting with Spring Integration 4.0 the
<splitter>
supportsIterator
aspayload
to split. Hence you can convert inboundFile
to theLineIterator
and process messages for each line in paralle, if anoutput-channel
of<splitter>
isExecutorChannel
: