I have some two sets of activity that needs to be executed in parallel. After their successful completion, I want to execute another set of activity. I used Task and it was working. But, after using the @Asynchronous annotation, I am getting the DecisionTaskTimedOut and none of the activity starts its execution. My aspectj configuration is working as i can see the following classes in my target:
AsyncWorkflowImpl$AjcClosure1.class
AsyncWorkflowImpl$AjcClosure3.class
AsyncWorkflowImpl$AjcClosure5.class
Asynchronous Version
public class AsyncWorkflowImpl implements AsyncWorkflow{
private AsyncActivitiesClient activitiesClient = new AsyncActivitiesClientImpl();
private Async2ActivitiesClient activitiesClient2 = new Async2ActivitiesClientImpl();
@Override
public void executeActivity() {
Promise<Integer> intermediateRes = null;
Promise<Integer> intermediateRes2 = null;
for(int i=0; i<5; i++){
intermediateRes = testIntermediate(Promise.asPromise(i), intermediateRes);
}
for(int i=0; i<5; i++){
intermediateRes2 = testIntermediate2(Promise.asPromise(i), intermediateRes2);
}
test(intermediateRes,intermediateRes2);
}
@Asynchronous
public Promise<Integer> testIntermediate(final Promise<Integer> i, Promise<Integer> res){
return activitiesClient.testAct1(i);
}
@Asynchronous
public Promise<Integer> testIntermediate2(final Promise<Integer> i, Promise<Integer> res){
return activitiesClient2.testAct1(i);
}
@Asynchronous
public void test(final Promise<Integer> res, final Promise<Integer> res2){
activitiesClient.testAct2();
}
}
Task Version
public class AsyncWorkflowImpl implements AsyncWorkflow{
private AsyncActivitiesClient activitiesClient = new AsyncActivitiesClientImpl();
private Async2ActivitiesClient activitiesClient2 = new Async2ActivitiesClientImpl();
@Override
public void executeActivity() {
Promise<Integer> intermediateRes = null;
Promise<Integer> intermediateRes2 = null;
Settable<Integer> finalRes = new Settable<Integer>();
Settable<Integer> finalRes2 = new Settable<Integer>();
for(int i=0; i<5; i++){
intermediateRes = testIntermediate(i, intermediateRes);
}
for(int i=0; i<5; i++){
intermediateRes2 = testIntermediate2(i, intermediateRes2);
}
finalRes.chain(intermediateRes);
finalRes2.chain(intermediateRes2);
test(finalRes,finalRes2);
}
public Promise<Integer> testIntermediate(final Integer i, Promise<Integer> res){
final Settable<Integer> tempRes = new Settable<Integer>();
new Task(res){
@Override
protected void doExecute() throws Throwable {
tempRes.chain(activitiesClient.testAct1(i));
}
};
return tempRes;
}
public Promise<Integer> testIntermediate2(final Integer i, Promise<Integer> res){
final Settable<Integer> tempRes = new Settable<Integer>();
new Task(res){
@Override
protected void doExecute() throws Throwable {
tempRes.chain(activitiesClient2.testAct1(i));
}
};
return tempRes;
}
public void test(final Promise<Integer> res, final Promise<Integer> res2){
new Task(res, res2){
@Override
protected void doExecute() throws Throwable {
activitiesClient.testAct2();
}
};
}
}
Is there any problem in aspectj weaving? Any suggestion is highly appreciated.
An @Asynchronous method is executed when all of its parameters that extend type Promise are ready. Your @Asynchronous methods are never executed because of the res parameter which is never ready. The solution is to annotate such parameters with @NoWait to inform the framework that these should not be waited.
The best way to troubleshoot workflows that appears "stuck" is by looking at their "asynchronous stack trace" that contains async stack traces of all outstanding tasks. Use WorkflowExecutionFlowThreadDumper to emit such a trace.