I have a multiprocessing
program where
- one process adds elements to a shared list (
multiprocessing.Manager().list()
)
- several other processes consume these elements from that list (and remove them); they run until there is something to process in the list and the process above is still adding to the list.
I implemented locking (via multiprocessing.Lock()
) when adding to the list, or removing from it. Since there is one "feeder" process and several (10-40) "consumer" ones all competing for the lock, and that the consumer processes are fast, I end up with the "feeder" process having a hard time acquiring the lock.
Is there a concept of "priority" when acquiring a lock? I would like the "feeder" process to acquire it with more priority than the others.
Right now I mitigated the issue by having the "consumer" processes wait a random time before trying to acquire the lock while the "feeder" process is there (when it ends it sets a flag). This is a workaround which works but it is ugly and hardly effective (I have the processes wait random.random()*n
seconds, where n
is the number of processes. This is a completely made up number, probably wrong).
Make the Feeder's acquisition of the lock blocking, and the consumer's non-blocking.
So for the feeder:
try:
with my_lock.acquire(): #locks block by default
do stuff
finally:
my_lock.release()
And the consumers:
while True:
try:
locked = my_lock.acquire(blocking=False)
if locked:
do stuff
finally:
if locked:
my_lock.release()
time.sleep(seconds=10)
Instead of trying to change the locking priority, try changing the process priority itself such that feeder has a higher priority than the consumer. The workaround you used basically simulates this but with less efficiency.
To change process priority,
On Unix: use os.setpriority()
Refer the docs
On windows, use a third party module psutil.
Refer to this thread and Psutil Docs.
It is not perfect, but it must work:
In "feeder":
feeder_lock_object.lock()
consumer_lock_object.lock()
try:
...
finally:
feeder_lock_object.release()
consumer_lock_object.release()
In "consumer":
while True:
with consumer_lock_object:
if feeder_lock_object.is_locked:
continue
...
But I think it will be better when you will be use Queue.
If you use this method, be careful about how you implement the lock object. You should initialize the pool with an initializer function that creates these lock objects as global params. Refer to this.