Persist issue with a spring batch ItemWriter using

2019-05-09 22:13发布

I have an issue with a spring batch ItemWriter that relies on a JPA repository in order to update data.

Here it is:

@Component
public class MessagesDigestMailerItemWriter implements ItemWriter<UserAccount> {

    private static final Logger log = LoggerFactory.getLogger(MessagesDigestMailerItemWriter.class);

    @Autowired
    private MessageRepository messageRepository;

    @Autowired
    private MailerService mailerService;

    @Override
    public void write(List<? extends UserAccount> userAccounts) throws Exception {
        log.info("Mailing messages digests and updating messages notification statuses");

        for (UserAccount userAccount : userAccounts) {
            if (userAccount.isEmailNotification()) {
                mailerService.mailMessagesDigest(userAccount);
            }
            for (Message message : userAccount.getReceivedMessages()) {
                message.setNotificationSent(true);
                messageRepository.save(message);//NOT SAVING!!
            }
        }
    }
}

Here is my Step configuration:

@Configuration
public class MailStepConfiguration {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Autowired
    private MessagesDigestMailerItemWriter itemWriter;

    @Bean
    public Step messagesDigestMailingStep() {
        return stepBuilderFactory.get("messagesDigestMailingStep")//
                .<UserAccount, UserAccount> chunk(1)//
                .reader(jpaPagingItemReader(entityManagerFactory))//
                .writer(itemWriter)//
                .build();
    }

    @Bean(destroyMethod = "")
    @StepScope
    public static ItemReader<UserAccount> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
        final JpaPagingItemReader<UserAccount> reader = new JpaPagingItemReader<>();
        reader.setEntityManagerFactory(entityManagerFactory);
        reader.setQueryString("SELECT ua FROM UserAccount ua JOIN FETCH ua.receivedMessages msg WHERE msg.notificationSent = false AND msg.messageRead = false");
        return reader;
    }

}

For completeness sake, here is my spring boot configuration:

@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
@ComponentScan("com.bignibou.batch.configuration")
public class Batch {
    public static void main(String[] args) {
        System.exit(SpringApplication.exit(new SpringApplicationBuilder(Batch.class).web(false).run(args)));
    }
}

and my datasource config:

@Configuration
@EnableJpaRepositories({ "com.bignibou.repository" })
@EntityScan("com.bignibou.domain")
public class DatasourceConfiguration {

    @Bean
    @ConfigurationProperties("spring.datasource.batch")
    public DataSource batchDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.application")
    public DataSource applicationDatasource() {
        return DataSourceBuilder.create().build();
    }
}

I noticed that the execution flow get into the ItemWriter's write method and the messageRepository.save(message); does get executed but the data is not updated.

I suspect it is a transaction issue but I am not sure how to solve this problem...

edit: I forgot to mention that I have two Postgres databases:

  1. one for the job repository data
  2. another one for the application data.

I can confirm that data is written into the job repository database. The issue is with the application data. I am required to use distributed transactions bearing in mind the fact that I have two PG databases?

2条回答
祖国的老花朵
2楼-- · 2019-05-09 22:24

I opened an issue for this here:

https://jira.spring.io/browse/BATCH-2642

In principle, what helped us was to configure primary transaction manager like so:

@Configuration
public class JpaConfig {

    private final DataSource dataSource;

    @Autowired
    public JpaConfig(@Qualifier("dataSource") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Bean
    @Primary
    public JpaTransactionManager jpaTransactionManager() {
        final JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setDataSource(dataSource);
        return transactionManager;
    }

}

And then using autowired instance of transaction manager when configuring the step, like so:

@Autowired
private PlatformTransactionManager transactionManager;

private TaskletStep buildTaskletStep() {
        return stepBuilderFactory.get("SendCampaignStep")
                    .<UserAccount, UserAccount>chunk(pushServiceConfiguration.getCampaignBatchSize())
                    .reader(userAccountItemReader)
                    .processor(userAccountItemProcessor)
                    .writer(userAccountItemWriter)
                    .transactionManager(transactionManager)
                    .build();
    }
}

Data is now correctly persisted, but there is still some magic I don't fully get...

查看更多
聊天终结者
3楼-- · 2019-05-09 22:27

You should @EnableTransactionManagement in your main class. I believe Spring Boot will create transaction manager for you, but if you want to override defaults, you may want to configure it explicitly.

Spring Batch than provides APIs for changing transaction attributes.

查看更多
登录 后发表回答