Unresponsive actor system: ThreadPoolExecutor disp

2019-06-21 08:24发布

Update: I've found that my program remains responsive if I set the ThreadPoolExecutor's core pool size to be the same as the max pool size (29 threads). However, if I set the core pool size to 11 and the max pool size to 29 then the actor system only ever creates 11 threads. How can I configure the ActorSystem / ThreadPoolExecutor to continue to create threads to exceed the core thread count and stay within the max thread count? I would prefer not to set the core thread count to the max thread count, as I only need the extra threads for a job cancellation (which should be a rare event).


I have a batch program running against an Oracle database, implemented using Java/Akka typed actors with the following actors:

  1. BatchManager is in charge of talking to the REST controller. It manages a Queue of uninitialized batch jobs; when an uninitialized batch job is polled from the queue then it is turned into a JobManager actor and executed.
  2. JobManager maintains a queue of stored procedures and a pool of Workers; it initializes each Worker with a stored procedure, and when a Worker finishes it sends the procedure's result to the JobManager, and the JobManager sends another stored procedure to the Worker. The batch terminates when the job queue is empty and all Workers are idle, at which point the JobManager reports its results to the BatchManager, shuts down its workers (via TypedActor.context().stop()), and then shuts itself down. The JobManager has a Promise<Status> completion that is completed when the job successfully finishes or else when the job is terminated due to cancellation or a fatal exception.
  3. Worker executes a stored procedure. It creates the OracleConnection and a CallableStatement used to execute the stored procedures, and registers an onFailure callback with JobManager.completion to abort the connection and cancel the statement. This callback doesn't use the actor system's execution context, instead it uses an execution context created from a cached executor service created in BatchManager.

My config is

{"akka" : { "actor" : { "default-dispatcher" : {
    "type" : "Dispatcher",
    "executor" : "default-executor",
    "throughput" : "1",
    "default-executor" : { "fallback" : "thread-pool-executor" }
    "thread-pool-executor" : {
        "keep-alive-time" : "60s",
        "core-pool-size-min" : coreActorCount,
        "core-pool-size-max" : coreActorCount,
        "max-pool-size-min" : maxActorCount,
        "max-pool-size-max" : maxActorCount,
        "task-queue-size" : "-1",
        "task-queue-type" : "linked",
        "allow-core-timeout" : "on"
}}}}}

The number of workers is configured elsewhere, currently workerCount = 8; coreActorCount is workerCount + 3 while maxActorCount is workerCount * 3 + 5. I'm testing this on a Macbook Pro 10 with two cores and 8GB of memory; the production server is considerably larger. The database I'm talking to is behind an extremely slow VPN. I'm running all of this using Oracle's JavaSE 1.8 JVM. The local server is Tomcat 7. The Oracle JDBC drivers are version 10.2 (I might be able to convince the powers that be to use a newer version). All methods either return void or Future<> and ought to be non-blocking.

When one batch terminates successfully then there is no issue - the next batch starts immediately with a full complement of workers. However, if I terminate the current batch via JobManager#completion.tryFailure(new CancellationException("Batch cancelled")) then the onFailure callbacks registered by the Workers fire off, and then the system becomes unresponsive. Debug printlns indicate that the new batch starts with three out of eight functioning workers, and the BatchManager becomes completely unresponsive (I added a Future<String> ping command that just returns a Futures.successful("ping") and this also times out). The onFailure callbacks are executing on a separate thread pool, and even if they were on the actor system's thread pool I should have a high enough max-pool-size to accommodate the original JobManager, its Workers, its onFailure callbacks, and a second JobManager and is Workers. Instead I seem to be accommodating the original JobManager and its Workers, the new JobManager and less than half of its Workers, and nothing left over for the BatchManager. The computer I'm running this on is short on resources, but it seems like it ought to be able to run more than a dozen threads.

Is this a configuration issue? Is this due to a JVM-imposed limit and/or a Tomcat-imposed limit? Is this due to a problem with how I'm handling blocking IO? There are probably several other things I could be doing wrong, these are just what came to mind.

Gist of CancellableStatement where the CallableStatement and OracleConnection are cancelled

Gist of Immutable where CancellableStatements are created

Gist of JobManager's cleanup code

Config dump obtained via System.out.println(mergedConfig.toString());


Edit: I believe that I've narrowed down the problem to the actor system (either its configuration or its interaction with blocking database calls). I eliminated the Worker actors and moved their workload to Runnables that execute on a fixed-size ThreadPoolExecutor, where each JobManager creates its own ThreadPoolExecutor and shuts it down when the batch completes (shutDown on normal termination, shutDownNow on exceptional termination). Cancellation runs on a cached thread pool instantiated in the BatchManager. The actor system's dispatcher is still a ThreadPoolExecutor but with only a half dozen threads allocated to it. Using this alternate setup, cancellation executes as expected - the workers terminate when their database connections are aborted, and the new JobManager executes immediately with a full complement of worker threads. This indicates to me that this is not a hardware/JVM/Tomcat issue.


Update: I did a thread dump using Eclipse's Memory Analyzer. I discovered that the cancellation threads were hanging on CallableStatement.close(), so I reordered the cancellation so that OracleConnection.abort() preceded CallableStatement.cancel() and this cleared up the problem - the cancellations all (apparently) executed correctly. The Worker threads continued to hang on their statements, though - I suspect that my VPN may be partially or totally to blame for this.

PerformanceAsync-akka.actor.default-dispatcher-19
  at java.net.SocketInputStream.socketRead0(Ljava/io/FileDescriptor;[BIII)I (Native Method)
  at java.net.SocketInputStream.read([BIII)I (SocketInputStream.java:150)
  at java.net.SocketInputStream.read([BII)I (SocketInputStream.java:121)
  at oracle.net.ns.Packet.receive()V (Unknown Source)
  at oracle.net.ns.DataPacket.receive()V (Unknown Source)
  at oracle.net.ns.NetInputStream.getNextPacket()V (Unknown Source)
  at oracle.net.ns.NetInputStream.read([BII)I (Unknown Source)
  at oracle.net.ns.NetInputStream.read([B)I (Unknown Source)
  at oracle.net.ns.NetInputStream.read()I (Unknown Source)
  at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1()S (T4CMAREngine.java:1109)
  at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1()B (T4CMAREngine.java:1080)
  at oracle.jdbc.driver.T4C8Oall.receive()V (T4C8Oall.java:485)
  at oracle.jdbc.driver.T4CCallableStatement.doOall8(ZZZZ)V (T4CCallableStatement.java:218)
  at oracle.jdbc.driver.T4CCallableStatement.executeForRows(Z)V (T4CCallableStatement.java:971)
  at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout()V (OracleStatement.java:1192)
  at oracle.jdbc.driver.OraclePreparedStatement.executeInternal()I (OraclePreparedStatement.java:3415)
  at oracle.jdbc.driver.OraclePreparedStatement.execute()Z (OraclePreparedStatement.java:3521)
  at oracle.jdbc.driver.OracleCallableStatement.execute()Z (OracleCallableStatement.java:4612)
  at com.util.CPProcExecutor.execute(Loracle/jdbc/OracleConnection;Ljava/sql/CallableStatement;Lcom/controller/BaseJobRequest;)V (CPProcExecutor.java:57)

However, even after fixing the cancellation order I still have the problem where the actor system isn't creating enough threads: I'm still only getting three out of eight workers in the new batch, with new workers being added as the cancelled workers have their network connections time out. In total I've got 11 threads - my core pool size, out of 29 threads - my max pool size. Apparently the actor system is ignoring my max pool size parameter, or I'm not configuring the max pool size correctly.

2条回答
2楼-- · 2019-06-21 09:03

(Disclaimer: I don't know Akka)

By your below configuration of queue-size=-1, I guess, the task queue is unbounded.

  "task-queue-size": "-1",
  "task-queue-type": "linked"

ThreadPoolExecutor will not spawn beyond core pool size unless the work queue is full and is not able to queue. Only if the task queue is full, it will start spawning upto max threads.

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing. If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

Please check if you can fix up a limited queue size and see if the threads are increasing to max threads. Thanks.

查看更多
smile是对你的礼貌
3楼-- · 2019-06-21 09:05

There's not enough code to provide a solution, but when system becomes unresponsive you can check your system resource utilitzation (cpu, ram) if they are not altered, check Oracle database.

If when you kill a group of connections anoter job starts immediately: I guess there are blocking sessions at oracle level (a not commited write-transaction is blocking another write transactions on the same resources).

When in unresponsive state, check blocking sessions:

SELECT s1.username || '@' || s1.machine
    || ' ( SID=' || s1.sid || ' )  is blocking '
    || s2.username || '@' || s2.machine || ' ( SID=' || s2.sid || ' ) ' AS blocking_status
    FROM v$lock l1, v$session s1, v$lock l2, v$session s2
    WHERE s1.sid=l1.sid AND s2.sid=l2.sid
    AND l1.BLOCK=1 AND l2.request > 0
    AND l1.id1 = l2.id1
    AND l1.id2 = l2.id2
查看更多
登录 后发表回答