This is the problem I have: I'm using Python 2.7, and I have a code which runs in a thread, which has a critical region that only one thread should execute at the time. That code currently has no mutex mechanisms, so I wanted to inquire what I could use for my specific use case, which involves "dropping" of "queued" functions. I've tried to simulate that behavior with the following minimal working example:
useThreading=False # True
if useThreading: from threading import Thread, Lock
else: from multiprocessing import Process, Lock
mymutex = Lock()
import time
tstart = None
def processData(data):
#~ mymutex.acquire()
try:
print('thread {0} [{1:.5f}] Do some stuff'.format(data, time.time()-tstart))
time.sleep(0.5)
print('thread {0} [{1:.5f}] 1000'.format(data, time.time()-tstart))
time.sleep(0.5)
print('thread {0} [{1:.5f}] done'.format(data, time.time()-tstart))
finally:
#~ mymutex.release()
pass
# main:
tstart = time.time()
for ix in xrange(0,3):
if useThreading: t = Thread(target = processData, args = (ix,))
else: t = Process(target = processData, args = (ix,))
t.start()
time.sleep(0.001)
Now, if you run this code, you get a printout like this:
thread 0 [0.00173] Do some stuff
thread 1 [0.00403] Do some stuff
thread 2 [0.00642] Do some stuff
thread 0 [0.50261] 1000
thread 1 [0.50487] 1000
thread 2 [0.50728] 1000
thread 0 [1.00330] done
thread 1 [1.00556] done
thread 2 [1.00793] done
That is to say, the three threads quickly get "queued" one after another (something like 2-3 ms after each other). Actually, they don't get queued, they simply start executing in parallel after 2-3 ms after each other.
Now, if I enable the mymutex.acquire()
/.release()
commands, I get what would be expected:
thread 0 [0.00174] Do some stuff
thread 0 [0.50263] 1000
thread 0 [1.00327] done
thread 1 [1.00350] Do some stuff
thread 1 [1.50462] 1000
thread 1 [2.00531] done
thread 2 [2.00547] Do some stuff
thread 2 [2.50638] 1000
thread 2 [3.00706] done
Basically, now with locking, the threads don't run in parallel, but they run one after another thanks to the lock - as long as one thread is working, the others will block at the .acquire()
. But this is not exactly what I want to achieve, either.
What I want to achieve is this: let's assume that when .acquire()
is first triggered by a thread function, it registers an id of a function (say a pointer to it) in a queue. After that, the behavior is basically the same as with the Lock - while the one thread works, the others block at .acquire()
. When the first thread is done, it goes in the finally:
block - and here, I'd like to check to see how many threads are waiting in the queue; then I'd like to delete/drop all waiting threads except for the very last one - and finally, I'd .release()
the lock; meaning that after this, what was the last thread in the queue would execute next. I'd imagine, I would want to write something like the following pseudocode:
...
finally:
if (len(mymutex.queue) > 2): # more than this instance plus one other waiting:
while (len(mymutex.queue) > 2):
mymutex.queue.pop(1) # leave alone [0]=this instance, remove next element
# at this point, there should be only queue[0]=this instance, and queue[1]= what was the last thread queued previously
mymutex.release() # once we releace, queue[0] should be gone, and the next in the queue should acquire the mutex/lock..
pass
...
With that, I'd expect a printout like this:
thread 0 [0.00174] Do some stuff
thread 0 [0.50263] 1000
thread 0 [1.00327] done
# here upon lock release, thread 1 would be deleted - and the last one in the queue, thread 2, would acquire the lock next:
thread 2 [1.00350] Do some stuff
thread 2 [1.50462] 1000
thread 2 [2.00531] done
What would be the most straightforward way to achieve this in Python?