I have used Spring state-machine in quite a complex scenario. I will explain my problem with the simplest part of the SM. Refer below image. This is my main state machine
The state circled in red points to the following sub-machine
So, as you can see, I have 3 actions. sendBasicTemplate, timeoutLogAction and processBasicTemplateReply. I will provide the related code segments and my configuration below.
What I have observed during this process is that the state-machines created by the factory resides in memory always. There's some reference to it which i cannot think of. Is it that the SM doesn't stop or is there anything I'm doing wrong? Here's my code.
Configuration class
@Configuration @EnableStateMachineFactory public class CambodiaStateMachine extends StateMachineConfigurerAdapter<String, String> {
@Override
public void configure(StateMachineModelConfigurer<String, String> model) throws Exception {
model
.withModel()
.factory(modelFactory());
}
@Override public void configure(StateMachineConfigurationConfigurer<String, String> config) throws Exception {
config
.withConfiguration()
.machineId("cambodia")
.autoStartup(true)
.listener(listener()); }
@Bean
public StateMachineListener<String, String> listener() {
return new StateMachineListenerAdapter<String, String>() {
@Override
public void stateChanged(State<String, String> from, State<String, String> to) {
System.out.println("State change to " + to.getId());
}
};
}
@Bean
public StateMachineModelFactory<String, String> modelFactory() {
return new UmlStateMachineModelFactory("classpath:stm/model.uml");
}
}
Methods : 1. This is how my events are fed to the machine and where new SM instances are made. I take my events from a queue
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "sims.events.mq", durable = "true"), exchange = @Exchange(type = ExchangeTypes.TOPIC, value = "sims.events.mq.xch", ignoreDeclarationExceptions = "true", durable = "true"), key = "events"))
public void process(GenericMessage<String> message) {
try {
String imei = (String) message.getHeaders().get("imei");
Subscriber subscriber = subscriberService.findSubscriber(imei);
// quickly create 'new' state machine
StateMachine<String, String> stateMachine = factory.getStateMachine();
stateMachine.addStateListener(new CompositeStateMachineListener<String, String>() {
@Override
public void stateContext(StateContext<String, String> arg0) {
String user = (String) arg0.getExtendedState().getVariables().get("imei");
if (user == null) {
return;
}
log.info(arg0.getStage().toString() + "**********" + stateMachine.getState());
try {
redisStateMachinePersister.persist(arg0.getStateMachine(), "testprefixSw:" + user);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
// restore from persistent
String user = (String) message.getHeaders().get("imei");
log.info(user);
// attempt restoring only if key is exist
if (redisTemplate.hasKey("testprefixSw:" + user)) {
System.out.println("************************ prefix exists...restoring");
resetStateMachineFromStore(stateMachine, user);
} else {
stateMachine.start();
System.out.println("************************ No prefix");
}
log.info("Payload == > " + message.getPayload());
try {
stateMachine.getExtendedState().getVariables().put("imei", user);
stateMachine.getExtendedState().getVariables().put("fromState", stateMachine.getState().getId());
stateMachine.getExtendedState().getVariables().put("eventName", message.getPayload());
if(null!= message.getHeaders().get("templates"))
stateMachine.getExtendedState().getVariables().put("templates", message.getHeaders().get("templates"));
if(null!= message.getHeaders().get("ttl"))
stateMachine.getExtendedState().getVariables().put("ttl", message.getHeaders().get("ttl"));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
// check if state is properly restored...
log.info("Current State " + stateMachine.getState().toString());
feedMachine(stateMachine, user, message);
log.info("handler exited");
} catch (Exception e) {
log.error(e.getMessage(), e);
}
// TODO: save persistant state..
}
private void feedMachine(StateMachine<String, String> stateMachine, String user, GenericMessage<String> event)
throws Exception {
stateMachine.sendEvent(event);
System.out.println("persist machine --- > state :" + stateMachine.getState().toString());
redisStateMachinePersister.persist(stateMachine, "testprefixSw:" + user);
}
private StateMachine<String, String> resetStateMachineFromStore(StateMachine<String, String> stateMachine,
String user) throws Exception {
StateMachine<String, String> machine = redisStateMachinePersister.restore(stateMachine, "testprefixSw:" + user);
System.out.println("restore machine --- > state :" + machine.getState().toString());
return machine;
}
Actions
@Bean
public Action<String, String> sendBasicTemplate() {
// Action handler...
return new Action<String, String>() {
@Override
public void execute(StateContext<String, String> context) {
// MP: variables are the right way to do
String imeiNo = (String) context.getExtendedState().getVariables().get("imei");
String template = (String) context.getMessageHeader("template");
log.info("sending basic template " + template + " to " + imeiNo);
findTemplateNSend(context, template, imeiNo);
xbossBalanceCheck(context, imeiNo, "Direct Query");
setRiskyState(context, "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo, 0);
}
};
}
@Bean
public Action<String, String> processBasicTemplateReply() {
// Action handler...
return new Action<String, String>() {
@Override
public void execute(StateContext<String, String> context) {
log.info("Result for basic template processing started");
log.info(context.getStateMachine().getState().getIds().toString());
String imeiNo = (String) context.getExtendedState().getVariables().get("imei");
saveDirectValues(context, imeiNo);
String fromState = (String) context.getExtendedState().getVariables().get("fromState");
String eventName = (String) context.getExtendedState().getVariables().get("eventName");
long trId = (Long) context.getMessageHeader("processId") != null? (Long) context.getMessageHeader("processId") : 0;
String key = "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo;
log.info("*Going to delete if exists key ==>" + key);
if (clearRiskyStateIfSet(context, key)) {
log.info("------------------------------Jedis Exists");
sendSubscriberEventLog(imeiNo, fromState, context.getStateMachine().getState().getId(), trId, eventName, false, "Query Event Success");
}
// mark as success sent
context.getStateMachine().sendEvent("SEQUENCE_COMPLETE");
}
};
}
@Bean
public Action<String, String> timeoutLogAction() {
// Action handler...
return new Action<String, String>() {
@Override
public void execute(StateContext<String, String> context) {
// log.info("timeout log Action");
String imeiNo = (String) context.getStateMachine().getExtendedState().getVariables().get("imei");
// String imeiNo = (String)
// context.getExtendedState().getVariables().get("imei");
String fromState = (String) context.getExtendedState().getVariables().get("fromState");
String eventName = (String) context.getExtendedState().getVariables().get("eventName");
long trId = (Long) context.getMessageHeader("processId") != null ? (Long) context.getMessageHeader("processId") : 0;
String key = "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo;
log.info("*Going to delete if exists key ==>" + key);
if (clearRiskyStateIfSet(context, key)) {
log.info("------------------------------Jedis Exists at timeout. Event Failed");
sendSubscriberEventLog(imeiNo, fromState, context.getStateMachine().getId(), trId, eventName, true, "Direct Query Failed due to Timeout");
sendAlert(imeiNo, EventPriority.NORMAL, "Direct Query Failed due to Timeout");
}
}
};
}
So based on the above, Is there anything I'm missing so that the created state machines are not collected by garbage? or any other explanation as to why memory is being consumed with each request and it never gets released?