Synchronous blocking of multiple resources

2019-09-06 20:35发布

问题:

Abstract situation. We have 2 sheeps we can asynchronously use at time (Semaphore(2)) and 1 gate we can use at time. We want to spend sheep through gate 2 times (each time we need 1 sheep and 1 gate, it lasts 1 second) and feed sheep 1 time (it would take for 1 sheep and 2 seconds). Here's code example:

import asyncio


class Sheep:
    _sem = asyncio.Semaphore(2)  # we have 2 avaliable sheeps at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire sheep ({})'.format(self._reason))

    def release(self):
        print('release sheep ({})'.format(self._reason))
        type(self)._sem.release()


class Gate:
    _sem = asyncio.Semaphore(1)  # we have 1 avaliable gate at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire gate ({})'.format(self._reason))

    def release(self):
        print('release gate ({})'.format(self._reason))
        type(self)._sem.release()


async def spend(reason):
    sheep = Sheep(reason)
    gate = Gate(reason)
    await asyncio.gather(
        sheep.acquire(), 
        gate.acquire()
    )  # block 1 sheep, 1 gate
    await asyncio.sleep(1)  # 1 second
    print('Spend sheep through a gate')
    sheep.release()
    gate.release()


async def feed(reason):
    sheep = Sheep(reason)
    await  asyncio.gather(
        sheep.acquire()
    )  # block 1 sheep
    await asyncio.sleep(2)  # 2 seconds
    print('Feed sheep')
    sheep.release()


async def main():
    await asyncio.gather(
        spend('spend 1'),
        feed('feed 1'),
        spend('spend 2')
    )  # spend 2 times, feed 1 time


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

acquire gate (spend 1)
acquire sheep (spend 1)
acquire sheep (spend 2) <-----
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
acquire sheep (feed 1)
acquire gate (spend 2)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
Feed sheep
release sheep (feed 1)
[Finished in 3.2s]

Problem is that program doesn't work optimal way, reason in output's line №3: spend 2 blocks sheep while it wouldn't be able to use it immediately, it should wait for single gate blocked by spend 1. Second available sheep that could be fed while spend 1 just spends time wasted:

Optimal way how program should work: spend 1 blocks 1 sheep and 1 gate, spend 2 see that gate it blocked and no reason to block second sheep immediately. feed 1 can blocks second sheep and run while spend 1 is running. In this case program would be completed in 2 seconds instead of 3:

It's easy to see if you change order inside main's gather.

Resources should be blocked not only parallel, but also synchronous, sheep and gate should be blocked only if sheep is available and gate is available. Something like that:

while sheep.locked() or gate.locked():
    asyncio.sleep(0)
await asyncio.gather(
    sheep.acquire(), 
    gate.acquire()
)

But it doesn't look like universal and nice solution. May be any pattern or just nicer way to solve this problem exists? Any ideas are welcome.

回答1:

You could implement an asynchronous context manager that handles multiple locks. This object should make sure it doesn't hold any lock while waiting for another non-available lock:

class multilock(asyncio.locks._ContextManagerMixin):

    def __init__(self, *locks):
        self.released = list(locks)
        self.acquired = []

    async def acquire(self):
        while self.released:
            lock = self.released.pop()
            if lock.locked():
                self.release()
            await lock.acquire()
            self.acquired.append(lock)

    def release(self):
        while self.acquired:
            lock = self.acquired.pop()
            lock.release()
            self.released.append(lock)

Example:

async def test(lock1, lock2):
    async with multilock(lock1, lock2):
        print('Do something')


回答2:

Based on this solution I created solution for this example. We need two things:

  1. add locked() function to Sheep and Gate, that's checking if object can be acquired right now

  2. add and use new MultiAcquire task that would acquire objects only if it all can be acquired right now (and suspend for release event otherwise)

Here's final code, see MultiAcquire - it's main:

import asyncio


class Sheep:
    _sem = asyncio.Semaphore(2)  # we have 2 avaliable sheeps at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire sheep ({})'.format(self._reason))

    def release(self):
        print('release sheep ({})'.format(self._reason))
        type(self)._sem.release()

    def locked(self):
        return type(self)._sem.locked()


class Gate:
    _sem = asyncio.Semaphore(1)  # we have 1 avaliable gate at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire gate ({})'.format(self._reason))

    def release(self):
        print('release gate ({})'.format(self._reason))
        type(self)._sem.release()

    def locked(self):
        return type(self)._sem.locked()


class MultiAcquire(asyncio.Task):
    _check_lock = asyncio.Lock()  # to suspend for creating task that acquires objects
    _release_event = asyncio.Event()  # to suspend for any object was released

    def __init__(self, locks):
        super().__init__(self._task_coro())
        self._locks = locks
        # Here we use decorator to subscribe all release() calls,
        # _release_event would be set in this case:
        for l in self._locks:
            l.release = self._notify(l.release)

    async def _task_coro(self):
        while True:
            # Create task to acquire all locks and break on success:
            async with type(self)._check_lock:
                if not any(l.locked() for l in self._locks):  # task would be created only if all objects can be acquired
                    task = asyncio.gather(*[l.acquire() for l in self._locks])  # create task to acquire all objects 
                    await asyncio.sleep(0)  # start task without waiting for it
                    break
            # Wait for any release() to try again:
            await type(self)._release_event.wait()
        # Wait for task:
        return await task

    def _notify(self, func):
        def wrapper(*args, **kwargs):
            type(self)._release_event.set()
            type(self)._release_event.clear()
            return func(*args, **kwargs)
        return wrapper


async def spend(reason):
    sheep = Sheep(reason)
    gate = Gate(reason)
    await MultiAcquire([sheep, gate])  # block 1 sheep, 1 gate
    await asyncio.sleep(1)  # 1 second
    print('Spend sheep through a gate')
    sheep.release()
    gate.release()


async def feed(reason):
    sheep = Sheep(reason)
    await MultiAcquire([sheep])  # block 1 sheep
    await asyncio.sleep(2)  # 2 seconds
    print('Feed sheep')
    sheep.release()


async def main():
    await asyncio.gather(
        spend('spend 1'),
        feed('feed 1'),
        spend('spend 2')
    )  # spend 2 times, feed 1 time


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]