Abstract situation. We have 2 sheeps we can asynchronously use at time (Semaphore(2)) and 1 gate we can use at time. We want to spend sheep through gate 2 times (each time we need 1 sheep and 1 gate, it lasts 1 second) and feed sheep 1 time (it would take for 1 sheep and 2 seconds). Here's code example:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await asyncio.gather(
sheep.acquire(),
gate.acquire()
) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await asyncio.gather(
sheep.acquire()
) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Output:
acquire gate (spend 1)
acquire sheep (spend 1)
acquire sheep (spend 2) <-----
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
acquire sheep (feed 1)
acquire gate (spend 2)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
Feed sheep
release sheep (feed 1)
[Finished in 3.2s]
Problem is that program doesn't work optimal way, reason in output's line №3: spend 2
blocks sheep while it wouldn't be able to use it immediately, it should wait for single gate blocked by spend 1
. Second available sheep that could be fed while spend 1
just spends time wasted:
Optimal way how program should work: spend 1
blocks 1 sheep and 1 gate, spend 2
see that gate it blocked and no reason to block second sheep immediately. feed 1
can blocks second sheep and run while spend 1
is running. In this case program would be completed in 2 seconds instead of 3:
It's easy to see if you change order inside main's gather.
Resources should be blocked not only parallel, but also synchronous, sheep and gate should be blocked only if sheep is available and gate is available. Something like that:
while sheep.locked() or gate.locked():
asyncio.sleep(0)
await asyncio.gather(
sheep.acquire(),
gate.acquire()
)
But it doesn't look like universal and nice solution. May be any pattern or just nicer way to solve this problem exists? Any ideas are welcome.
You could implement an asynchronous context manager that handles multiple locks. This object should make sure it doesn't hold any lock while waiting for another non-available lock:
class multilock(asyncio.locks._ContextManagerMixin):
def __init__(self, *locks):
self.released = list(locks)
self.acquired = []
async def acquire(self):
while self.released:
lock = self.released.pop()
if lock.locked():
self.release()
await lock.acquire()
self.acquired.append(lock)
def release(self):
while self.acquired:
lock = self.acquired.pop()
lock.release()
self.released.append(lock)
Example:
async def test(lock1, lock2):
async with multilock(lock1, lock2):
print('Do something')
Based on this solution I created solution for this example. We need two things:
add locked()
function to Sheep
and Gate
, that's checking if
object can be acquired right now
add and use new MultiAcquire
task that would acquire objects only if it all can be acquired right now (and suspend for release event otherwise)
Here's final code, see MultiAcquire
- it's main:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class MultiAcquire(asyncio.Task):
_check_lock = asyncio.Lock() # to suspend for creating task that acquires objects
_release_event = asyncio.Event() # to suspend for any object was released
def __init__(self, locks):
super().__init__(self._task_coro())
self._locks = locks
# Here we use decorator to subscribe all release() calls,
# _release_event would be set in this case:
for l in self._locks:
l.release = self._notify(l.release)
async def _task_coro(self):
while True:
# Create task to acquire all locks and break on success:
async with type(self)._check_lock:
if not any(l.locked() for l in self._locks): # task would be created only if all objects can be acquired
task = asyncio.gather(*[l.acquire() for l in self._locks]) # create task to acquire all objects
await asyncio.sleep(0) # start task without waiting for it
break
# Wait for any release() to try again:
await type(self)._release_event.wait()
# Wait for task:
return await task
def _notify(self, func):
def wrapper(*args, **kwargs):
type(self)._release_event.set()
type(self)._release_event.clear()
return func(*args, **kwargs)
return wrapper
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await MultiAcquire([sheep, gate]) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await MultiAcquire([sheep]) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Output:
acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]