I have a few general questions about using Python and Redis to create a job queue application for running asynchronous commands. Here is the code I have generated so far:
def queueCmd(cmd):
r_server.rpush("cmds", cmd)
def printCmdQueue():
print r_server.lrange("cmds", 0 , -1)
def work():
print "command being consumed: ", r_server.lpop("cmds")
return -1
def boom(info):
print "pop goes the weasel"
if __name__ == '__main__':
r_server = redis.Redis("localhost")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
printCmdQueue()
pool = Pool(processes=2)
print "cnt:", +r_server.llen("cmds")
#while r_server.llen("cmds") > 0:
while True:
pool.apply_async(work, callback=boom)
if not r_server.lrange("cmds", 0, -1):
#if r_server.llen("cmds") == 0:
print "Terminate pool"
pool.terminate()
break
printCmdQueue()
First, am I correct in believing that if I need to do any communication to the manager, that I want to do so with a callback? The quick examples I seen on this use store the async call in a result and access it via result.get(timeout=1). And by communication, I mean put stuff back into a redis list.
Edit: if the command is run in async and I timeout on the result inside the main, does that timeout the worker or just that operation inside the manager? If only the manager couldn't I use this to check for exit codes from the worker?
Next, this code produces the following output:
['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
Terminate pool
command being consumed: None
pop goes the weasel
pop goes the weasel
pop goes the weasel
[]
Why does the worker want to consume multiple cmds at a time even though I am poping them off one at a time? On a similar not, this doesn't always end nicely and sometimes requires a ctrl+c. To deal with his I clear out the queue and go again. I think this relates to the apply_sync() and if to get out of loop. I am wondering if more needs to happen on the worker side?
If I change the ifs to the one commented out, I get:
ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'
This seems like it would be a better way to check to see if I need to break but it seems that function is returning a string literal at times?
Any advise on improving this would be much appreciated. I am simply trying to make a manager which will be like a service/daemon on a linux machine. It will be used to get jobs (currently commands but possibly more) from a redis list and returns results back into a redis list. Then down the road a GUI will interact with this manager to get status of queues and return results.
Thanks,
EDIT:
I realized I was being a bit of a goof. I do not need to access the redis server from a worker and that was leading to some errors (specifically the ValueError).
To fix this the loop is now:
while not r_server.llen("cmds") == 0:
cmd = r_server.lpop("cmds")
pool.apply_async(work, [cmd])
After these lines I call pool.close()
. I used os.getpid()
and os.getppid()
to check that I did in fact have multiple children running around.
I would still enjoy hearing if this sounds like a good way to create a manager/worker application that uses redis.
Your problem is that you are trying to run multiple commands concurrently with a single redis connection.
You are expecting something like
but you are getting
The results come back in the same order, but there is nothing linking a thread or command to a specific result. Individual redis connections are not thread safe - you will need one for each worker thread.
You can also see similar problems if you use pipelining inappropriately - it is designed for write only scenarios like adding lots of items to a list, where you can improve performance by assuming LPUSH succeeded rather than waiting for the server to tell you it succeeded after each item. Redis will still return the results, but they will not necessarily be results from the last command sent.
Other than that, the basic approach is reasonable. There are a couple of enhancements you could make though: