I have a few general questions about using Python and Redis to create a job queue application for running asynchronous commands. Here is the code I have generated so far:
def queueCmd(cmd):
r_server.rpush("cmds", cmd)
def printCmdQueue():
print r_server.lrange("cmds", 0 , -1)
def work():
print "command being consumed: ", r_server.lpop("cmds")
return -1
def boom(info):
print "pop goes the weasel"
if __name__ == '__main__':
r_server = redis.Redis("localhost")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
printCmdQueue()
pool = Pool(processes=2)
print "cnt:", +r_server.llen("cmds")
#while r_server.llen("cmds") > 0:
while True:
pool.apply_async(work, callback=boom)
if not r_server.lrange("cmds", 0, -1):
#if r_server.llen("cmds") == 0:
print "Terminate pool"
pool.terminate()
break
printCmdQueue()
First, am I correct in believing that if I need to do any communication to the manager, that I want to do so with a callback? The quick examples I seen on this use store the async call in a result and access it via result.get(timeout=1). And by communication, I mean put stuff back into a redis list.
Edit: if the command is run in async and I timeout on the result inside the main, does that timeout the worker or just that operation inside the manager? If only the manager couldn't I use this to check for exit codes from the worker?
Next, this code produces the following output:
['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
Terminate pool
command being consumed: None
pop goes the weasel
pop goes the weasel
pop goes the weasel
[]
Why does the worker want to consume multiple cmds at a time even though I am poping them off one at a time? On a similar not, this doesn't always end nicely and sometimes requires a ctrl+c. To deal with his I clear out the queue and go again. I think this relates to the apply_sync() and if to get out of loop. I am wondering if more needs to happen on the worker side?
If I change the ifs to the one commented out, I get:
ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'
This seems like it would be a better way to check to see if I need to break but it seems that function is returning a string literal at times?
Any advise on improving this would be much appreciated. I am simply trying to make a manager which will be like a service/daemon on a linux machine. It will be used to get jobs (currently commands but possibly more) from a redis list and returns results back into a redis list. Then down the road a GUI will interact with this manager to get status of queues and return results.
Thanks,
EDIT:
I realized I was being a bit of a goof. I do not need to access the redis server from a worker and that was leading to some errors (specifically the ValueError).
To fix this the loop is now:
while not r_server.llen("cmds") == 0:
cmd = r_server.lpop("cmds")
pool.apply_async(work, [cmd])
After these lines I call pool.close()
. I used os.getpid()
and os.getppid()
to check that I did in fact have multiple children running around.
I would still enjoy hearing if this sounds like a good way to create a manager/worker application that uses redis.