I am fairly new to Spring and Spring Batch, so feel free to ask any clarifying questions if you have any.
I am seeing an issue with Spring Batch that I cannot recreate in our test or local environments. We have a daily job that connects to Websphere MQ via JMS and retrieves a set of records. This job uses the out-of-the-box JMS ItemReader. We implement our own ItemProcessor, but it doesn't do anything special other than logging. There are no filters or processing that should affect incoming records.
The problem is that out of the 10,000+ daily records on MQ, only about 700 or so (the exact number is different each time) usually get logged in the ItemProcessor. All records are successfully pulled off the queue. The number of records logged is different each time and seems to have no pattern. By comparing the log files against the list of records in MQ, we can see that a seemingly random subset of records are being "processed" by our job. The first record might get picked up, then 50 are skipped, then 5 in a row, etc. And the pattern is different each time the job runs. No exceptions are logged either.
When running the same app in localhost and test using the same data set, all 10,000+ records are successfully retrieved and logged by the ItemProcessor. The job runs between 20 and 40 seconds in Production (also not constant), but in test and local it takes several minutes to complete (which obviously makes sense since it is handling so many more records).
So this is one of those tough issue to troubleshoot since we cannot recreate it. One idea is to implement our own ItemReader and add additional logging so that we can see if records are getting lost before the reader or after the reader - all we know now is that only a subset of records are being handled by the ItemProcessor. But even that will not solve our problem, and it will be somewhat timely to implement considering it is not even a solution.
Has anyone else seen an issue like this? Any possible ideas or troubleshooting suggestions would be greatly appreciated. Here are some of the jar version numbers we are using for reference.
- Spring - 3.0.5.RELEASE
- Spring Integration - 2.0.3.RELEASE
- Spring Batch - 2.1.7.RELEASE
- Active MQ - 5.4.2
- Websphere MQ - 7.0.1
Thanks in advance for your input.
EDIT: Per request, code for processor:
public SMSReminderRow process(Message message) throws Exception {
SMSReminderRow retVal = new SMSReminderRow();
LOGGER.debug("Converting JMS Message to ClaimNotification");
ClaimNotification notification = createClaimNotificationFromMessage(message);
retVal.setShortCode(BatchCommonUtils
.parseShortCodeFromCorpEntCode(notification.getCorpEntCode()));
retVal.setUuid(UUID.randomUUID().toString());
retVal.setPhoneNumber(notification.getPhoneNumber());
retVal.setMessageType(EventCode.SMS_CLAIMS_NOTIFY.toString());
DCRContent content = tsContentHelper.getTSContent(Calendar
.getInstance().getTime(),
BatchCommonConstants.TS_TAG_CLAIMS_NOTIFY,
BatchCommonConstants.TS_TAG_SMSTEXT_TYP);
String claimsNotificationMessage = formatMessageToSend(content.getContent(),
notification.getCorpEntCode());
retVal.setMessageToSend(claimsNotificationMessage);
retVal.setDateTimeToSend(TimeUtils
.getGMTDateTimeStringForDate(new Date()));
LOGGER.debug(
"Finished processing claim notification for {}. Writing row to file.",
notification.getPhoneNumber());
return retVal;
}
JMS config:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<bean id="claimsQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="jms/SMSClaimNotificationCF" />
<property name="lookupOnStartup" value="true" />
<property name="cache" value="true" />
<property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>
<bean id="jmsDestinationResolver"
class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>
<bean id="jmsJndiDestResolver"
class=" org.springframework.jms.support.destination.JndiDestinationResolver"/>
<bean id="claimsJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="claimsQueueConnectionFactory" />
<property name="defaultDestinationName" value="jms/SMSClaimNotificationQueue" />
<property name="destinationResolver" ref="jmsJndiDestResolver" />
<property name="pubSubDomain">
<value>false</value>
</property>
<property name="receiveTimeout">
<value>20000</value>
</property>
</bean>