Apache Flink 1.4.2 akka.actor.ActorNotFound

2019-08-26 20:54发布

问题:

After upgrading to Apache Flink 1.4.2 we get following errors every few seconds on one TaskManager out of 3.

2018-06-27 17:33:46.632 [jobmanager-future-thread-2] DEBUG o.a.flink.runtime.rest.handler.legacy.metrics.MetricFetcher  - Could not retrieve QueryServiceGateway.
java.util.concurrent.CompletionException: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@tm03-dev:6124/), Path(/user/MetricQueryService_64bde0e9e6f3f0a906a30e88c261c9d7)]
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:442)
        at akka.dispatch.OnComplete.internal(Future.scala:258)
        at akka.dispatch.OnComplete.internal(Future.scala:256)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at scala.concurrent.Promise$class.complete(Promise.scala:55)
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
        at scala.concurrent.Promise$class.failure(Promise.scala:104)
        at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:157)
        at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:68)
        at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
        at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:76)
        at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
        at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:75)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
        at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:558)
        at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:595)
        at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:584)
        at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:98)
        at akka.remote.ReliableDeliverySupervisor$$anonfun$gated$1.applyOrElse(Endpoint.scala:353)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@tm03-dev:6124/), Path(/user/MetricQueryService_64bde0e9e6f3f0a906a30e88c261c9d7)]
        ... 27 common frames omitted



2018-06-27 17:34:01.625 [flink-akka.actor.default-dispatcher-19] DEBUG org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler  - Error while handling request.
java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.NotFoundException: Could not find job 93d6fa4fb5b2355bb03253cb80d81ef5.
        at org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler.lambda$handleJsonRequest$0(AbstractExecutionGraphRequestHandler.java:70)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.lambda$getExecutionGraph$0(ExecutionGraphCache.java:130)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
        at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:444)
        at akka.dispatch.OnComplete.internal(Future.scala:259)
        at akka.dispatch.OnComplete.internal(Future.scala:256)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at scala.concurrent.Promise$class.complete(Promise.scala:55)
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
        at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
        at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
        at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:123)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.rest.NotFoundException: Could not find job 93d6fa4fb5b2355bb03253cb80d81ef5.
        ... 53 common frames omitted

The respective TaskManager disappears and reappears in the cluster but no job can be successfully run on it.

Trying to run a job on this TaskManager gives

TaskManager (64bde0e9e6f3f0a906a30e88c261c9d7 @ tm03-dev (dataPort=6125)) not responding after a timeout of 10000 ms
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$3(Execution.java:529)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@tm03-dev:6124/user/taskmanager#-636625535]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)

Also the TaskManager seems to send no heartbeat response back to the JobManager

2018-06-28 08:07:52.462 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-7 - Sending Heartbeat to [akka.tcp://flink@tm01-dev:6124]
2018-06-28 08:07:52.462 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-7 - Sending Heartbeat to [akka.tcp://flink@tm01-dev:6124]
2018-06-28 08:07:52.462 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-7 - Sending Heartbeat to [akka.tcp://flink@tm02-dev:6124]
2018-06-28 08:07:52.462 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-7 - Sending Heartbeat to [akka.tcp://flink@tm02-dev:6124]
2018-06-28 08:07:52.462 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-7 - Sending Heartbeat to [akka.tcp://flink@tm03-dev:6124]
2018-06-28 08:07:52.462 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-7 - Sending Heartbeat to [akka.tcp://flink@tm03-dev:6124]
2018-06-28 08:07:52.463 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-6 - Received heartbeat rsp from [akka.tcp://flink@tm02-dev:6124]
2018-06-28 08:07:52.463 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-6 - Received heartbeat rsp from [akka.tcp://flink@tm02-dev:6124]
2018-06-28 08:07:52.463 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-6 - Received heartbeat rsp from [akka.tcp://flink@tm01-dev:6124]
2018-06-28 08:07:52.463 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.RemoteWatcher flink-akka.remote.default-remote-dispatcher-6 - Received heartbeat rsp from [akka.tcp://flink@tm01-dev:6124]
2018-06-28 08:07:52.653 [flink-akka.actor.default-dispatcher-2] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-6 - Association with remote system [akka.tcp://flink@tm03-dev:6124] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@tm03-dev:6124]] Caused by: [tm03-dev: Name or service not known]
2018-06-28 08:07:52.653 [flink-akka.actor.default-dispatcher-2] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-6 - Association with remote system [akka.tcp://flink@tm03-dev:6124] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@tm03-dev:6124]] Caused by: [tm03-dev: Name or service not known]

The cluster was recreated from scratch with the same results. This was not happening for 1.3.2

What can be causing this?

回答1:

We did a couple of things here after which the error disappeared:

We restarted the JobManagers

We increased TaskManager Memory size from 1GB to 2GB.

I can't explain it, but for now error is not showing up anymore since few weeks and all seems fine