I'm encountering an issue where I have two methods, the first calling the second in a loop and the second creating a Future
as follows:
public class WorkersCoordinator {
private static Logger LOGGER =
LoggerFactory.getLogger(WorkersCoordinator.class);
private final Timeout timeout;
private final ActorSystem system;
private final List<Class<? extends BaseWorker>> workers;
private final Map<Class, ActorRef> refMap;
private final WorkResultPackageQueue pkgQueue;
private final ActorFactory actorFactory;
@Autowired
public WorkersCoordinator(final ApplicationConfiguration config,
final ActorSystem system,
final ActorFactory actorFactory,
final WorkerFactory workerFactory,
final WorkResultPackageQueue pkgQueue) {
timeout = new Timeout(config.getTimeoutInMilliseconds(),
TimeUnit.MILLISECONDS);
this.system = system;
this.actorFactory = actorFactory;
this.pkgQueue = pkgQueue;
refMap = Map.newHashMap();
workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
}
public void delegateWorkers() {
for (Class<? extends BaseWorker> worker : workers) {
if (refMap.containsKey(worker) continue;
sendWork(worker);
}
}
private void sendWork(Class<? extends BaseWorker> worker) {
// GetDataActor extends AbstractActor
ActorRef actorRef = actorFactory.create(GetDataActor.class);
Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
responseRef.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
LOGGER.error("Worker {} encountered a problem - cancelling.",
worker.getSimpleName());
if (refMap.containsKey(worker)) {
refMap.remove(worker);
}
}
}, system.dispatcher());
responseRef.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object msg) throws Throwable {
if (msg instanceof WorkResultPackage) {
final WorkResultPackage reportPackage = (WorkResultPackage) msg;
LOGGER.info(
"Received AssetDataPackage from {}, containing {} items",
reportPackage.getWorkReport().getFromWorker().getSimpleName(),
reportPackage.getPkg().getData().size());
pkgQueue.enqueue(reportPackage);
} else {
LOGGER.eror(
"Expected to receive WorkResultPackage Object but received: {}",
msg.getClass());
throw new UnknownObjectReceived(msg);
}
}
}, system.dispatcher());
refMap.put(worker, actorRef);
}
}
The issue is that the closure I believe responseRef.onFailure
makes doesn't act as I would expect. If I call this with 3 workers, one of which I configure to fail, a failure is handled but the logging is indeterminate as to which worker will be reported as having failed even though it should be consistently the one I marked to fail. I'm new to this tech stack (Java, Scala Futures, and AKKA) and the established code base in which I find this established pattern so I don't know if I'm overlooking something or misunderstanding closures in Java/Scala Futures. The sticking point here is that I need to report which worker failed and remove it from the refMap
so that it will no longer be considered in process. Even stranger is the fact that all workers appear to complete and to be removed from refMap
even while the wrong worker is reported as failed.
Update: After having no luck with getting an answer why the closure wasn't working, I did some investigation and found another post answering whether Java 8 even supports closures:
Does Java 8 Support Closures?
Short answer, I believe it does. However, it spoke of final
or effectively final
variables. Thus, I updated my code as follows. Hopefully, this gets folks who understand closures to help me understand why they don't work as I'm used to (in C# and JavaScript). I'm only posting updates to sendWork(...)
to highlight efforts I tried to no avail.
private void sendWork(Class<? extends BaseWorker> worker) {
// GetDataActor extends AbstractActor
ActorRef actorRef = actorFactory.create(GetDataActor.class);
Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
Consumer<Throwable> onFailureClosure = (ex) -> {
LOGGER.error("Worker {} encountered a problem - cancelling.",
worker.getSimpleName());
if (refMap.containsKey(worker)) {
refMap.remove(worker);
}
}
responseRef.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
onFailureClosure.accept(failure);
}
}, system.dispatcher());
responseRef.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object msg) throws Throwable {
if (msg instanceof WorkResultPackage) {
final WorkResultPackage reportPackage = (WorkResultPackage) msg;
LOGGER.info(
"Received AssetDataPackage from {}, containing {} items",
reportPackage.getWorkReport().getFromWorker().getSimpleName(),
reportPackage.getPkg().getData().size());
pkgQueue.enqueue(reportPackage);
} else {
LOGGER.eror(
"Expected to receive WorkResultPackage Object but received: {}",
msg.getClass());
throw new UnknownObjectReceived(msg);
}
}
}, system.dispatcher());
refMap.put(worker, actorRef);
}