-->

Catch Keyboard Interrupt to stop Python multiproce

2019-02-23 16:31发布

问题:

From several posts found on stackoverflow i created this code.

Scenario

I want to have a multiprocessing.queue on which several worker "listen"

In case of a keyboard interrupt, the main process should no longer put new items in the queue and with the help of the sentinel objects, the worker should gracefully be stopped.

Problem

My problem with the current version where i use

signal.signal(signal.SIGINT, signal.SIG_IGN) 

To ignore the Ctrl + C is that it is also ignored by the main process.

Any Ideas ? Do I need to use the multiprocessing worker pool ? Some examples indicate that i might have to. Can I then still use the queue ?

from multiprocessing import Pool, Process,Queue
import time
import signal
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process


class Worker(Process):
    def __init__(self, queue,ident):
        super(Worker, self).__init__()
        # Ignore Signals
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        self.queue= queue
        self.idstr= str(ident)
        print "Ident" + self.idstr

    def run(self):
        print 'Worker started'
        # do some initialization here

        print 'Computing things!'
        for data in iter( self.queue.get, None ):
            print "#" + self.idstr + " : " + str(data)
            time.sleep(5)
            print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())

        print "Worker Done"

#### Main ####
request_queue = Queue(10)

for i in range(4):
    Worker( request_queue,i ).start()

try:
    for data in range(1000000):
        request_queue.put( data )
        #print "Queue Size: " + str(request_queue.qsize())
        # Sentinel objects to allow clean shutdown: 1 per worker.
    for i in range(4):
        request_queue.put( None ) 

except KeyboardInterrupt:
    print "Caught KeyboardInterrupt, terminating workers"
    while  request_queue.empty()==False:
         request_queue.get()
    request_queue.put( None )    

回答1:

Based on your solution (which is good), I added an additional layer of protection incase the main code is unresponsive and the user cancels twice:

global STOP

import os, signal
def signal_handler(sig, frame):
    global STOP
    if STOP:
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        os.kill(os.getpid(), signal.SIGTERM)
    STOP = True
signal.signal(signal.SIGINT, signal_handler)


回答2:

I think I found a solution. Still I don't like that I get the SIGINT 1 time from main and 4 times from the Worker, but maybe I have to live with that.

  1. I specified a signal handler for the Interrupt Signal.
  2. After Receiving the first Sig INT I ignore more SIG Int signal
  3. I switch the stop flag to TRUE
  4. I break out the queue insert loop
  5. I call stop function which clears the queue and inserts the stop sentinels

    from multiprocessing import Pool, Process,Queue
    import time
    import signal
    # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
    # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process
    
    # Stop Flag for loop
    stop = False
    
    # Define SIGINT
    def signal_handler(sig, frame):
        print 'You pressed Ctrl+C!'
        global stop
        stop = True
        # Ignore more Ctrl+C
        signal.signal(signal.SIGINT, signal.SIG_IGN) 
    
    signal.signal(signal.SIGINT, signal_handler)
    
    def stopSentinel(request_queue):
        print "CTRL Stop Queue and insert None"
    
    # Empty Existing Queue
    while  request_queue.empty()==False:
         request_queue.get()
    
    # Put One None for each Worker
    for i in range(4):
        request_queue.put( None ) 
    
    
    class Worker(Process):
        def __init__(self, queue,ident):
            super(Worker, self).__init__()
    
            self.queue= queue
            self.idstr= str(ident)
            print "Ident" + self.idstr
    
        def run(self):
            print 'Worker started'
            # do some initialization here
    
            print 'Computing things!'
            for data in iter( self.queue.get, None ):
                print "#" + self.idstr + " : " + str(data)
                time.sleep(5)
                print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())
    
            print "Worker Done"
    
    
    
    #### Main #####
    request_queue = Queue(10)
    
    for i in range(4):
        Worker( request_queue,i ).start()
    
    #### Fill Queue with Data ####
    for data in range(1000000):
        request_queue.put( data )
        #print "Queue Size: " + str(request_queue.qsize())
        # Sentinel objects to allow clean shutdown: 1 per worker.
    
        # Check for Stop
        print "Check Breakout"
        if stop == True:
            print "Stop Break"
            break
    
    if stop == True:
        stopSentinel(request_queue)
    else:       
        print "Normal Stop" 
        for i in range(4):
            request_queue.put( None )