-->

Esper data loss when inbound threading is enabled

2019-08-22 06:47发布

问题:

I found data loss in Esper (v.7.1.0) in case if inbound thread pool is enabled. Here is simple example that demonstrates this strange behaviour:

    Configuration config = new Configuration();
    // set up concurrent processing
    config.getEngineDefaults().getThreading().setThreadPoolInbound(true);

    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);

    // simple schema
    epService.getEPAdministrator().createEPL("create objectarray schema LogLine as (account_name string, value int) ");
    // event for terminating context partition
    epService.getEPAdministrator().createEPL("create schema TerminateEvent() ");

    // Allocates context partition for each account_name. Start it on LogLine event and terminate on TerminateEvent.
    epService.getEPAdministrator()
            .createEPL("create context NestedCtx " + 
                       "context InitCtx start LogLine end TerminateEvent ," + 
                       "context AccountCtx partition by account_name from LogLine");
    // select to collect count of events per account_name.
    EPStatement statement = epService.getEPAdministrator().createEPL(" context NestedCtx select account_name, count(*) as total from LogLine output last when terminated ");
    // attach listener for printing results 
    statement.addListener(new UpdateListener() {

        @Override
        public void update(EventBean[] newEvents, EventBean[] oldEvents) {
            for (EventBean eventBean : newEvents) {
                String properties = Arrays.stream(eventBean.getEventType().getPropertyNames()).map((prop) -> {
                    return prop + " " + eventBean.get(prop);
                }).collect(Collectors.joining("; "));
                System.out.println(properties);
            }

        }
    });
    //send 3 LogLine events
    epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
    epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
    epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");

    // send terminate event in order to get results
    epService.getEPRuntime().sendEvent(Collections.emptyMap(), "TerminateEvent");
    System.out.println("finish");

The problem is that UpdateListener is not being called when concurrent processing is enabled. Result is printed only when I disable inbound thread pool. What's the reason of this behaviour?

回答1:

Inbound threading can change the order in which events get processed, as the JVM can process queued tasks in any order. Therefore when your use case requires ordered processing of events, that means inbound threading is not the right choice. You application code can instead allocate your its queue/threads and associate events to the threads making sure that order is preserved. For example as discussed in this StackOverflow question.