I am in a situation where I want to stop/cancel the flink job from the code. This is in my integration test where I am submitting a task to my flink job and check the result. As the job runs, asynchronously, it doesn't stop even when the test fails/passes. I want to job the stop after the test is over.
I tried a few things which I am listing below :
- Get the jobmanager actor
- Get running jobs
- For each running job, send a cancel request to the jobmanager
This, of course in not running but I am not sure whether the jobmanager actorref is wrong or something else is missing.
The error I get is : [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[akka://flink/temp/$a] to Actor[akka://flink/user/jobmanager_1] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'
which means either the job manager actor ref is wrong or the message sent to it is incorrect.
The code looks like the following:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
Can someone check if this is the correct approach ?
EDIT : To completely stop the job, it is necessary to stop the TaskManager along with the JobManager in the order TaskManager first and then JobManager.
You're creating a new
ActorSystem
and then try to find an actor with the name/user/jobmanager_1
in the same actor system. This won't work, since the actual job manager will run in a differentActorSystem
.If you want to obtain an
ActorRef
to the real job manager, you either have to use the sameActorSystem
for the selection (then you can use a local address) or you have find out the remote address for the job manager actor. The remote address has the formatakka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]
. If you have access to theFlinkMiniCluster
then you can use theleaderGateway
promise to obtain the current leader'sActorGateway
.