Python & Redis: Manager/Worker application best pr

2019-08-04 12:47发布

问题:

I have a few general questions about using Python and Redis to create a job queue application for running asynchronous commands. Here is the code I have generated so far:

def queueCmd(cmd):
    r_server.rpush("cmds", cmd)

def printCmdQueue():
    print r_server.lrange("cmds", 0 , -1)

def work():
    print "command being consumed: ", r_server.lpop("cmds")
    return -1

def boom(info):
    print "pop goes the weasel"

if __name__ == '__main__':

    r_server = redis.Redis("localhost")

    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")

    printCmdQueue()

    pool = Pool(processes=2)

    print "cnt:", +r_server.llen("cmds")
    #while r_server.llen("cmds") > 0:
    while True:
        pool.apply_async(work, callback=boom)
        if not r_server.lrange("cmds", 0, -1):
        #if r_server.llen("cmds") == 0:
            print "Terminate pool"
            pool.terminate()
            break

    printCmdQueue()

First, am I correct in believing that if I need to do any communication to the manager, that I want to do so with a callback? The quick examples I seen on this use store the async call in a result and access it via result.get(timeout=1). And by communication, I mean put stuff back into a redis list.

Edit: if the command is run in async and I timeout on the result inside the main, does that timeout the worker or just that operation inside the manager? If only the manager couldn't I use this to check for exit codes from the worker?

Next, this code produces the following output:

['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
Terminate pool
command being consumed:  None
 pop goes the weasel
pop goes the weasel
pop goes the weasel
[]

Why does the worker want to consume multiple cmds at a time even though I am poping them off one at a time? On a similar not, this doesn't always end nicely and sometimes requires a ctrl+c. To deal with his I clear out the queue and go again. I think this relates to the apply_sync() and if to get out of loop. I am wondering if more needs to happen on the worker side?

If I change the ifs to the one commented out, I get:

ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'

This seems like it would be a better way to check to see if I need to break but it seems that function is returning a string literal at times?

Any advise on improving this would be much appreciated. I am simply trying to make a manager which will be like a service/daemon on a linux machine. It will be used to get jobs (currently commands but possibly more) from a redis list and returns results back into a redis list. Then down the road a GUI will interact with this manager to get status of queues and return results.

Thanks,

EDIT:

I realized I was being a bit of a goof. I do not need to access the redis server from a worker and that was leading to some errors (specifically the ValueError).

To fix this the loop is now:

while not r_server.llen("cmds") == 0:
    cmd = r_server.lpop("cmds")
    pool.apply_async(work, [cmd])

After these lines I call pool.close(). I used os.getpid() and os.getppid() to check that I did in fact have multiple children running around.

I would still enjoy hearing if this sounds like a good way to create a manager/worker application that uses redis.

回答1:

Your problem is that you are trying to run multiple commands concurrently with a single redis connection.

You are expecting something like

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
command      
             LLEN test
             0

but you are getting

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
             LLEN test
             command
0

The results come back in the same order, but there is nothing linking a thread or command to a specific result. Individual redis connections are not thread safe - you will need one for each worker thread.

You can also see similar problems if you use pipelining inappropriately - it is designed for write only scenarios like adding lots of items to a list, where you can improve performance by assuming LPUSH succeeded rather than waiting for the server to tell you it succeeded after each item. Redis will still return the results, but they will not necessarily be results from the last command sent.

Other than that, the basic approach is reasonable. There are a couple of enhancements you could make though:

  • Rather than checking the length, just use non-blocking LPOP - if it returns null, the list is empty
  • Add a timer so that if the list is empty it will wait rather than just issuing another command.
  • Include a cancellation check in the while loop condition
  • Handle connection errors - I use an outer loop set up so that if the connection fails the worker will attempt to reconnect (basically restart main) for a reasonable number of attempts before terminating the worker process altogether.