I have 10 records in my source table and I am having item count as 3.
I have 2 partitions to process these 10 records(i.e first 5 records will be processed in first partition and remaining records processed in 2nd partition while processing records in 2nd partition I am throwing an exception so job will be failed at 2nd chunk of 2nd partition.when I am restarting the job ,failed partition is processing all the records again(that is first chunk and 2nd chunk). Restarting the job should only process from last failed chunk records but not all the records in that partition.Can you please guide me how to achieve this?
My JSL is like below:
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
id="readingfrom-db" restartable="true" version="1.0" >
<properties >
<property name="numRec" value="#{jobParameters['numRec']}?:5;"/>
<property name="chunkSize" value="#{jobParameters['chunkSize']}?:3;"/>
<property name="whereclauseFrom" value="#{jobParameters['whereclauseFrom']}?:5;"/>
<property name="whereclauseTo" value="#{jobParameters['whereclauseTo']}?:6;"/>
<property name="dsJNDI" value="#{jobParameters['dsJNDI']}?:jdbc/db2;"/>
<property name="dsJNDI1" value="#{jobParameters['dsJNDI1']}?:jdbc/db2;"/>
<property name="tableName" value="#{jobParameters['tableName']}?:CISDW.AIF1_CH;"/>
<property name="ProcesstableName" value="#{jobParameters['ProcesstableName']}?:CISDW.PROC_AIF1_CH;"/>
</properties>
<step id="runcache" next="readFromDB">
<batchlet ref="com.cdc.runcache.CacheRunnerBatchlet" />
</step>
<step id="readFromDB">
<listeners>
<listener ref="com.cdc.dbreader.LogExceptionListener"/>
</listeners>
<chunk item-count="3" checkpoint-policy="item">
<reader ref="com.cdc.dbreader.DBItemReader">
<properties >
<property name="dsJNDI" value="#{jobProperties['dsJNDI']}"/>
<property name="tableName" value="#{jobProperties['tableName']}"/>
<property name="whereclauseFrom" value="#{partitionPlan['modrec']}"/>
</properties>
</reader>
<processor ref="com.cdc.dbreader.DBItemProcessor" />
<writer ref="com.cdc.dbreader.DBItemWriter">
<properties >
<property name="dsJNDI" value="#{jobProperties['dsJNDI1']}"/>
<property name="tableName" value="#{jobProperties['ProcesstableName']}"/>
</properties>
</writer>
</chunk>
<partition>
<plan partitions="2" threads="2">
<properties partition="0">
<property name="modrec" value="#{jobProperties['whereclauseFrom']}"/>
</properties>
<properties partition="1">
<property name="modrec" value="#{jobProperties['whereclauseTo']}"/>
</properties>
</plan>
</partition>
</step>
</job>
My Item Reader is like below:
public class DBItemReader implements ItemReader {
@Inject
@BatchProperty
private String dsJNDI;
@Inject
@BatchProperty
private String whereclauseFrom;
@Inject
@BatchProperty
private String tableName;
private Connection conn =null;
private int totalRecords=0;
private DataSource ds = null;
List<RecObj> listRecObj=new ArrayList<RecObj>();
@Override
public Object readItem() throws SQLException {
if (listRecObj.size() == 0) {
return null;
} else {
RecObj rec =null;
Iterator<RecObj> iter =listRecObj.iterator();
while (iter.hasNext()) {
rec = iter.next();
if (Integer.parseInt(rec.getRec()) == 7) {
throw new IllegalStateException("Thrown Error");
}
iter.remove();
return rec;
}
return rec;
}
@Override
public void open(Serializable arg0) throws NamingException, SQLException {
ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));
// System.out.println("whereclauseFrom: " + whereclauseFrom);
conn = ds.getConnection();
String sql ="";
if(Integer.parseInt(whereclauseFrom) == 5){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ whereclauseFrom;
}else if(Integer.parseInt(whereclauseFrom) == 6){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ whereclauseFrom;
}
PreparedStatement ps = conn.prepareStatement(sql);
ResultSet rs=ps.executeQuery();
while(rs.next()){
totalRecords++;
String rec=rs.getString("REC");
if(rec != null)
listRecObj.add(new RecObj(rec));
}
rs.close();
}
@Override
public void close() throws SQLException {
conn.close();
}
@Override
public Serializable checkpointInfo() {
return null;
}
}
}
MY Writer class is like below:
public class DBItemWriter extends AbstractItemWriter implements ItemWriter {
@Inject
@BatchProperty
private String dsJNDI;
@Inject
@BatchProperty
private String tableName;
private DataSource ds = null;
@Override
public void open(Serializable arg0) throws NamingException {
ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));
}
@Override
public void writeItems(List<java.lang.Object> items) throws BatchUpdateException,SQLException{
Connection conn = ds.getConnection();
String sql = "INSERT INTO "+tableName+ "(MOD_REC) VALUES(?) ";
PreparedStatement ps = conn.prepareStatement(sql);
for (Object obj : items) {
RecObj v = (RecObj)obj;
System.out.println("=======Writer values===="+v.getRec());
ps.setString(1, v.getRec());
ps.addBatch();
}
ps.executeBatch();
ps.clearBatch();
ps.close();
conn.close();
}
}
Below is my Processor:
public class DBItemProcessor implements ItemProcessor {
Integer count=0;
@Override
public Object processItem(Object arg0) {
count++;
RecObj v=(RecObj)arg0;
String vname=v.getRec();
System.out.println("=========Processer Values==="+vname);
return new RecObj(vname+count);
}
}
Below is my Bean class
public class RecObj {
private String rec;
public RecObj(String rec) {
this.rec=rec;
}