So, I have 2 partitions in a step which writes into a database. I want to record the number of rows written in each partition, get the sum, and print it to the log;
I was thinking of using a static
variable in the Writer and use Step Context/Job Context to get it in afterStep()
of the Step Listener. However when I tried it I got null
. I am able to get these values in close()
of the Reader.
Is this the right way to go about it? Or should I use Partition Collector/Reducer/ Analyzer?
I am using a java batch in Websphere Liberty. And I am developing in Eclipse.
I was thinking of using a static variable in the Writer and use Step Context/Job Context to get it in afterStep() of the Step Listener. However when i tried it i got null.
The ItemWriter might already be destroyed at this point, but I'm not sure.
Is this the right way to go about it?
Yes, it should be good enough. However, you need to ensure the total row count is shared for all partitions because the batch runtime maintains a StepContext clone per partition. You should rather use JobContext
.
I think using PartitionCollector and PartitionAnalyzer is a good choice, too. Interface PartitionCollector has a method collectPartitionData()
to collect data coming from its partition. Once collected, batch runtime passes this data to PartitionAnalyzer to analyze the data. Notice that there're
- N PartitionCollector per step (1 per partition)
- N StepContext per step (1 per partition)
- 1 PartitionAnalyzer per step
The records written can be passed via StepContext's transientUserData
. Since the StepContext is reserved for its own step-partition, the transient user data won't be overwritten by other partition.
Here's the implementation :
MyItemWriter :
@Inject
private StepContext stepContext;
@Override
public void writeItems(List<Object> items) throws Exception {
// ...
Object userData = stepContext.getTransientUserData();
stepContext.setTransientUserData(partRowCount);
}
MyPartitionCollector
@Inject
private StepContext stepContext;
@Override
public Serializable collectPartitionData() throws Exception {
// get transient user data
Object userData = stepContext.getTransientUserData();
int partRowCount = userData != null ? (int) userData : 0;
return partRowCount;
}
MyPartitionAnalyzer
private int rowCount = 0;
@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {
rowCount += (int) fromCollector;
System.out.printf("%d rows processed (all partitions).%n", rowCount);
}
Reference : JSR352 v1.0 Final Release.pdf
Let me offer a bit of an alternative on the accepted answer and add some comments.
PartitionAnalyzer variant - Use analyzeStatus() method
Another technique would be to use analyzeStatus
which only gets called at the end of each entire partition, and is passed the partition-level exit status.
public void analyzeStatus(BatchStatus batchStatus, String exitStatus)
In contrast, the above answer using analyzeCollectorData
gets called at the end of each chunk on each partition.
E.g.
public class MyItemWriteListener extends AbstractItemWriteListener {
@Inject
StepContext stepCtx;
@Override
public void afterWrite(List<Object> items) throws Exception {
// update 'newCount' based on items.size()
stepCtx.setExitStatus(Integer.toString(newCount));
}
Obviously this only works if you weren't using the exit status for some other purpose. You can set the exit status from any artifact (though this freedom might be one more thing to have to keep track of).
Comments
The API is designed to facilitate an implementation dispatching individual partitions across JVMs, (e.g. in Liberty you can see this here.) But using a static ties you to a single JVM, so it's not a recommended approach.
Also note that both the JobContext and the StepContext are implemented in the "thread-local"-like fashion we see in batch.