Spring Statemachine Factory -stays in memory

2019-07-18 18:58发布

问题:

I have used Spring state-machine in quite a complex scenario. I will explain my problem with the simplest part of the SM. Refer below image. This is my main state machine

The state circled in red points to the following sub-machine

So, as you can see, I have 3 actions. sendBasicTemplate, timeoutLogAction and processBasicTemplateReply. I will provide the related code segments and my configuration below.

What I have observed during this process is that the state-machines created by the factory resides in memory always. There's some reference to it which i cannot think of. Is it that the SM doesn't stop or is there anything I'm doing wrong? Here's my code.

Configuration class

@Configuration @EnableStateMachineFactory public class CambodiaStateMachine extends StateMachineConfigurerAdapter<String, String> {

    @Override
    public void configure(StateMachineModelConfigurer<String, String> model) throws Exception {
        model           
            .withModel()
                .factory(modelFactory());
    }

    @Override   public void configure(StateMachineConfigurationConfigurer<String, String> config) throws Exception {
        config
            .withConfiguration()
            .machineId("cambodia")
            .autoStartup(true)
            .listener(listener());  }    

    @Bean
    public StateMachineListener<String, String> listener() {
        return new StateMachineListenerAdapter<String, String>() {
            @Override
            public void stateChanged(State<String, String> from, State<String, String> to) {
                System.out.println("State change to " + to.getId());
            }
        };
    }

    @Bean
    public StateMachineModelFactory<String, String> modelFactory() {
        return new UmlStateMachineModelFactory("classpath:stm/model.uml");
    }
}

Methods : 1. This is how my events are fed to the machine and where new SM instances are made. I take my events from a queue

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "sims.events.mq", durable = "true"), exchange = @Exchange(type = ExchangeTypes.TOPIC, value = "sims.events.mq.xch", ignoreDeclarationExceptions = "true", durable = "true"), key = "events"))
    public void process(GenericMessage<String> message) {

        try {

            String imei = (String) message.getHeaders().get("imei");
            Subscriber subscriber = subscriberService.findSubscriber(imei);


            // quickly create 'new' state machine
            StateMachine<String, String> stateMachine = factory.getStateMachine();

            stateMachine.addStateListener(new CompositeStateMachineListener<String, String>() {

                @Override
                public void stateContext(StateContext<String, String> arg0) {

                    String user = (String) arg0.getExtendedState().getVariables().get("imei");
                    if (user == null) {
                        return;
                    }

                    log.info(arg0.getStage().toString() + "**********" + stateMachine.getState());
                    try {
                        redisStateMachinePersister.persist(arg0.getStateMachine(), "testprefixSw:" + user);
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                }

            });

            // restore from persistent
            String user = (String) message.getHeaders().get("imei");
            log.info(user);

            // attempt restoring only if key is exist
            if (redisTemplate.hasKey("testprefixSw:" + user)) {
                System.out.println("************************  prefix exists...restoring");
                resetStateMachineFromStore(stateMachine, user);
            } else {
                stateMachine.start();
                System.out.println("************************  No prefix");

            }

            log.info("Payload == > " + message.getPayload());

            try {
                stateMachine.getExtendedState().getVariables().put("imei", user);
                stateMachine.getExtendedState().getVariables().put("fromState", stateMachine.getState().getId());
                stateMachine.getExtendedState().getVariables().put("eventName", message.getPayload());
                if(null!= message.getHeaders().get("templates"))
                    stateMachine.getExtendedState().getVariables().put("templates", message.getHeaders().get("templates"));

                if(null!= message.getHeaders().get("ttl"))
                    stateMachine.getExtendedState().getVariables().put("ttl", message.getHeaders().get("ttl"));
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }

            // check if state is properly restored...
            log.info("Current State " + stateMachine.getState().toString());

            feedMachine(stateMachine, user, message);

            log.info("handler exited");

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        // TODO: save persistant state..
    }


private void feedMachine(StateMachine<String, String> stateMachine, String user, GenericMessage<String> event)
        throws Exception {
    stateMachine.sendEvent(event);
    System.out.println("persist machine --- > state :" + stateMachine.getState().toString());
    redisStateMachinePersister.persist(stateMachine, "testprefixSw:" + user);
}

private StateMachine<String, String> resetStateMachineFromStore(StateMachine<String, String> stateMachine,
        String user) throws Exception {

    StateMachine<String, String> machine = redisStateMachinePersister.restore(stateMachine, "testprefixSw:" + user);
    System.out.println("restore machine --- > state :" + machine.getState().toString());
    return machine;
}

Actions

@Bean
    public Action<String, String> sendBasicTemplate() {

        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {
                // MP: variables are the right way to do
                String imeiNo = (String) context.getExtendedState().getVariables().get("imei");
                String template = (String) context.getMessageHeader("template");

                log.info("sending basic template " + template + " to " + imeiNo);

                findTemplateNSend(context, template, imeiNo);
                xbossBalanceCheck(context, imeiNo, "Direct Query");
                setRiskyState(context, "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo, 0);
            }
        };
    }

    @Bean
    public Action<String, String> processBasicTemplateReply() {

        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {

                log.info("Result for basic template processing started");
                log.info(context.getStateMachine().getState().getIds().toString());
                String imeiNo = (String) context.getExtendedState().getVariables().get("imei");

                saveDirectValues(context, imeiNo);

                String fromState = (String) context.getExtendedState().getVariables().get("fromState");
                String eventName = (String) context.getExtendedState().getVariables().get("eventName");
                long trId = (Long) context.getMessageHeader("processId") != null? (Long) context.getMessageHeader("processId") : 0;


                String key = "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo;
                log.info("*Going to delete if exists key ==>" + key);

                if (clearRiskyStateIfSet(context, key)) {
                    log.info("------------------------------Jedis Exists");
                    sendSubscriberEventLog(imeiNo, fromState, context.getStateMachine().getState().getId(), trId, eventName, false, "Query Event Success");
                }

                // mark as success sent
                context.getStateMachine().sendEvent("SEQUENCE_COMPLETE");
            }
        };
    }


@Bean
    public Action<String, String> timeoutLogAction() {
        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {
                // log.info("timeout log Action");

                String imeiNo = (String) context.getStateMachine().getExtendedState().getVariables().get("imei");

                // String imeiNo = (String)
                // context.getExtendedState().getVariables().get("imei");
                String fromState = (String) context.getExtendedState().getVariables().get("fromState");
                String eventName = (String) context.getExtendedState().getVariables().get("eventName");
                long trId = (Long) context.getMessageHeader("processId") != null ? (Long) context.getMessageHeader("processId") : 0;


                String key = "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo;
                log.info("*Going to delete if exists key ==>" + key);

                if (clearRiskyStateIfSet(context, key)) {
                    log.info("------------------------------Jedis Exists at timeout. Event Failed");
                    sendSubscriberEventLog(imeiNo, fromState, context.getStateMachine().getId(), trId, eventName, true, "Direct Query Failed due to Timeout");
                    sendAlert(imeiNo, EventPriority.NORMAL, "Direct Query Failed due to Timeout");
                }

            }
        };
    }

So based on the above, Is there anything I'm missing so that the created state machines are not collected by garbage? or any other explanation as to why memory is being consumed with each request and it never gets released?