I found data loss in Esper (v.7.1.0) in case if inbound thread pool is enabled. Here is simple example that demonstrates this strange behaviour:
Configuration config = new Configuration();
// set up concurrent processing
config.getEngineDefaults().getThreading().setThreadPoolInbound(true);
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
// simple schema
epService.getEPAdministrator().createEPL("create objectarray schema LogLine as (account_name string, value int) ");
// event for terminating context partition
epService.getEPAdministrator().createEPL("create schema TerminateEvent() ");
// Allocates context partition for each account_name. Start it on LogLine event and terminate on TerminateEvent.
epService.getEPAdministrator()
.createEPL("create context NestedCtx " +
"context InitCtx start LogLine end TerminateEvent ," +
"context AccountCtx partition by account_name from LogLine");
// select to collect count of events per account_name.
EPStatement statement = epService.getEPAdministrator().createEPL(" context NestedCtx select account_name, count(*) as total from LogLine output last when terminated ");
// attach listener for printing results
statement.addListener(new UpdateListener() {
@Override
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
for (EventBean eventBean : newEvents) {
String properties = Arrays.stream(eventBean.getEventType().getPropertyNames()).map((prop) -> {
return prop + " " + eventBean.get(prop);
}).collect(Collectors.joining("; "));
System.out.println(properties);
}
}
});
//send 3 LogLine events
epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
// send terminate event in order to get results
epService.getEPRuntime().sendEvent(Collections.emptyMap(), "TerminateEvent");
System.out.println("finish");
The problem is that UpdateListener is not being called when concurrent processing is enabled. Result is printed only when I disable inbound thread pool. What's the reason of this behaviour?