Spring Batch: problems (mix data) when converting

2020-02-05 11:46发布

Maybe this is a recurrent question, but I need some customization with my context.

I'm using Spring Batch 3.0.1.RELEASE

I have a simple job with some steps. One step is a chunk like this:

    <tasklet transaction-manager="myTransactionManager">
<batch:chunk reader="myReader" processor="myProcessor" writer="myWriter" commit-interval="${commit.interval}">
</batch:chunk>

<bean id="myProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
    <list>
        <bean class="...MyFirstProcessor">
        </bean>
        <bean class="...MySecondProcessor">
        </bean>
    </list>
</property>

  • Reader: JdbcCursorItemReader
  • Processor: CompositeProcessor with my delegates
  • Writer: CompositeWriter with my delegates

With this configuration, my job works perfectly.

Now, I want to convert this to a multi-threaded job. Following the documentation to basic multi-thread jobs, I included a SympleAsyncTaskExecutor in the tasklet, but it failed.

I have readed JdbcCursorItemReader does not work properly with multi-thread execution (is it right?). I have changed the reader to a JdbcPagingItemReader, and it has been a nightmare: job does not fail, writing process are ok, but data has mixed among the threads, and customer data were not right and coherent (customers have got services, addreses, etc. from others).

So, why does it happen? How could I change to a multi-thread job?

  • Are the composite processor and writer right for multithread?
  • How could I make a custom thread-safe composite processor?
  • Maybe could it be the JDBC reader: Is there any thread-safe JDBC reader for multi-thread?

I'm very locked and confused with this, so any help would be very appreciated. Thanks a lot.

[EDIT - SOLVED]

Well, the right and suitable fix to my issue is to design the job for multithread and thread-safe execution from the beggining. It's habitual to practice first with one-thread step execution, to understand and know Spring Batch concepts; but if you consider you are leaving this phase behind, considerations like immutable objects, thread-safe list, maps, etc... must raise.

And the current fix in the current state of my issue has been the next I describe later. After test Martin's suggestions and taking into account Michael's guidelines, I have finally fix my issue as good as I could. The next steps aren't good practice, but I couldn't rebuild my job from the beggining:

  • Change itemReader to JdbcPagingItemReader with setState to false.
  • Change List by CopyOnWriteArrayList.
  • Change HashMap by ConcurrentHashMap.
  • In each delegated processor, get a new instance of every bean property (fortunately, there was only one injected bean) by passing the context (implements ApplicationContextAware) and getting a unique instance of the bean (configure every injected bean as scope="prototype").

So, if the delegated bean was:

<bean class="...MyProcessor">
<property name="otherBean"  ref="otherBeanID" />

Change to:

<bean class="...MyProcessor">
<property name="otherBean"  value="otherBeanID" />

And, inside MyProcessor, get a single instance for otherBeanID from the context; otherBeanID must be configurated with scope="protoype".

As I tell before, they're no good style, but it was my best option, and I can assert each thread has its own and different item instance and other bean instance.

It proves that some classes has not been well designed for a right multithread execution.

Martin, Michael, thanks for your support.

I hope it helps to anyone.

1条回答
Root(大扎)
2楼-- · 2020-02-05 12:19

You have asked a lot in your question (in the future, please break this type of question up into multiple, more specific questions). However, item by item:

Is JdbcCursorItemReader thread-safe?
As the documentation states, it is not. The reason for this is that the JdbcCursorItemReader wraps a single ResultSet which is not thread safe.

Are the composite processor and writer right for multithread?
The CompositeItemProcessor provided by Spring Batch is considered thread safe as long as the delegate ItemProcessor implementations are thread safe as well. You provide no code in relation to your implementations or their configurations so I can't verify their thread safety. However, given the symptoms you are describing, my hunch is that there is some form of thread safety issues going on within your code.

You also don't identify what ItemWriter implementations or their configurations you are using so there may be thread related issues there as well.

If you update your question with more information about your implementations and configurations, we can provide more insight.

How could I make a custom thread-safe composite processor?
There are two things to consider when implementing any ItemProcessor:

  1. Make it thread safe: Following basic thread safety rules (read the book Java Concurrency In Practice for the bible on the topic) will allow you to scale your components by just adding a task executor.
  2. Make it idempotent: During skip/retry processing, items may be re-processed. By making your ItemProcessor implementation idempotent, this will prevent side effects from this multiple trips through a processor.

Maybe could it be the JDBC reader: Is there any thread-safe JDBC reader for multi-thread?
As you have noted, the JdbcPaginingItemReader is thread safe and noted as such in the documentation. When using multiple threads, each chunk is executed in it's own thread. If you've configured the page size to match the commit-interval, that means each page is processed in the same thread.

Other options for scaling a single step
While you went down the path of implementing a single, multi-threaded step, there may be better options. Spring Batch provides 5 core scaling options:

  1. Multithreaded step - As you are trying right now.
  2. Parallel Steps - Using Spring Batch's split functionality you can execute multiple steps in parallel. Given that you're working with composite ItemProcessor and composite ItemWriters in the same step, this may be something to explore (breaking your current composite scenarios into multiple, parallel steps).
  3. Async ItemProcessor/ItemWriters - This option allows you to execute the processor logic in a different thread. The processor spins the thread off and returns a Future to the AsyncItemWriter which will block until the Future returns to be written.
  4. Partitioning - This is the division of the data into blocks called partitions that are processed in parallel by child steps. Each partition is processed by an actual, independent step so using step scoped components can prevent thread safety issues (each step gets it's own instance). Partition processing can be preformed either locally via threads or remotely across multiple JVMs.
  5. Remote Chunking - This option farms the processor logic out to other JVM processes. It really should only be used if the ItemProcessor logic is the bottle neck in the flow.

You can read about all of these options in the documentation for Spring Batch here: http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html

Thread safety is a complex problem. Just adding multiple threads to code that used to work in a single threaded environment will typically uncover issues in your code.

查看更多
登录 后发表回答