Maybe this is a recurrent question, but I need some customization with my context.
I'm using Spring Batch 3.0.1.RELEASE
I have a simple job with some steps. One step is a chunk like this:
<tasklet transaction-manager="myTransactionManager">
<batch:chunk reader="myReader" processor="myProcessor" writer="myWriter" commit-interval="${commit.interval}">
</batch:chunk>
<bean id="myProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
<list>
<bean class="...MyFirstProcessor">
</bean>
<bean class="...MySecondProcessor">
</bean>
</list>
</property>
- Reader: JdbcCursorItemReader
- Processor: CompositeProcessor with my delegates
- Writer: CompositeWriter with my delegates
With this configuration, my job works perfectly.
Now, I want to convert this to a multi-threaded job. Following the documentation to basic multi-thread jobs, I included a SympleAsyncTaskExecutor in the tasklet, but it failed.
I have readed JdbcCursorItemReader does not work properly with multi-thread execution (is it right?). I have changed the reader to a JdbcPagingItemReader, and it has been a nightmare: job does not fail, writing process are ok, but data has mixed among the threads, and customer data were not right and coherent (customers have got services, addreses, etc. from others).
So, why does it happen? How could I change to a multi-thread job?
- Are the composite processor and writer right for multithread?
- How could I make a custom thread-safe composite processor?
- Maybe could it be the JDBC reader: Is there any thread-safe JDBC reader for multi-thread?
I'm very locked and confused with this, so any help would be very appreciated. Thanks a lot.
[EDIT - SOLVED]
Well, the right and suitable fix to my issue is to design the job for multithread and thread-safe execution from the beggining. It's habitual to practice first with one-thread step execution, to understand and know Spring Batch concepts; but if you consider you are leaving this phase behind, considerations like immutable objects, thread-safe list, maps, etc... must raise.
And the current fix in the current state of my issue has been the next I describe later. After test Martin's suggestions and taking into account Michael's guidelines, I have finally fix my issue as good as I could. The next steps aren't good practice, but I couldn't rebuild my job from the beggining:
- Change itemReader to JdbcPagingItemReader with setState to false.
- Change List by CopyOnWriteArrayList.
- Change HashMap by ConcurrentHashMap.
- In each delegated processor, get a new instance of every bean property (fortunately, there was only one injected bean) by passing the context (implements ApplicationContextAware) and getting a unique instance of the bean (configure every injected bean as scope="prototype").
So, if the delegated bean was:
<bean class="...MyProcessor">
<property name="otherBean" ref="otherBeanID" />
Change to:
<bean class="...MyProcessor">
<property name="otherBean" value="otherBeanID" />
And, inside MyProcessor, get a single instance for otherBeanID from the context; otherBeanID must be configurated with scope="protoype".
As I tell before, they're no good style, but it was my best option, and I can assert each thread has its own and different item instance and other bean instance.
It proves that some classes has not been well designed for a right multithread execution.
Martin, Michael, thanks for your support.
I hope it helps to anyone.
You have asked a lot in your question (in the future, please break this type of question up into multiple, more specific questions). However, item by item:
Is
JdbcCursorItemReader
thread-safe?As the documentation states, it is not. The reason for this is that the
JdbcCursorItemReader
wraps a singleResultSet
which is not thread safe.Are the composite processor and writer right for multithread?
The
CompositeItemProcessor
provided by Spring Batch is considered thread safe as long as the delegateItemProcessor
implementations are thread safe as well. You provide no code in relation to your implementations or their configurations so I can't verify their thread safety. However, given the symptoms you are describing, my hunch is that there is some form of thread safety issues going on within your code.You also don't identify what
ItemWriter
implementations or their configurations you are using so there may be thread related issues there as well.If you update your question with more information about your implementations and configurations, we can provide more insight.
How could I make a custom thread-safe composite processor?
There are two things to consider when implementing any
ItemProcessor
:ItemProcessor
implementation idempotent, this will prevent side effects from this multiple trips through a processor.Maybe could it be the JDBC reader: Is there any thread-safe JDBC reader for multi-thread?
As you have noted, the
JdbcPaginingItemReader
is thread safe and noted as such in the documentation. When using multiple threads, each chunk is executed in it's own thread. If you've configured the page size to match the commit-interval, that means each page is processed in the same thread.Other options for scaling a single step
While you went down the path of implementing a single, multi-threaded step, there may be better options. Spring Batch provides 5 core scaling options:
ItemProcessor
and compositeItemWriter
s in the same step, this may be something to explore (breaking your current composite scenarios into multiple, parallel steps).ItemProcessor
/ItemWriters
- This option allows you to execute the processor logic in a different thread. The processor spins the thread off and returns aFuture
to theAsyncItemWriter
which will block until theFuture
returns to be written.ItemProcessor
logic is the bottle neck in the flow.You can read about all of these options in the documentation for Spring Batch here: http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html
Thread safety is a complex problem. Just adding multiple threads to code that used to work in a single threaded environment will typically uncover issues in your code.