classSemaphore(_ContextManagerMixin, mixins._LoopBoundMixin): """A Semaphore implementation. A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release(). Semaphores also support the context management protocol. The optional argument gives the initial value for the internal counter; it defaults to 1. If the value given is less than 0, ValueError is raised. """
def__init__(self, value=1, *, loop=mixins._marker): super().__init__(loop=loop) if value < 0: raise ValueError("Semaphore initial value must be >= 0") self._waiters = None self._value = value
def__repr__(self): res = super().__repr__() extra = 'locked'ifself.locked() elsef'unlocked, value:{self._value}' ifself._waiters: extra = f'{extra}, waiters:{len(self._waiters)}' returnf'<{res[1:-1]} [{extra}]>'
deflocked(self): """Returns True if semaphore cannot be acquired immediately.""" returnself._value == 0or ( any(not w.cancelled() for w in (self._waiters or ())))
asyncdefacquire(self): """Acquire a semaphore. If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True. """ ifnotself.locked(): self._value -= 1 returnTrue
ifself._waiters isNone: self._waiters = collections.deque() fut = self._get_loop().create_future() self._waiters.append(fut)
# Finally block should be called before the CancelledError # handling as we don't want CancelledError to call # _wake_up_first() and attempt to wake up itself. try: try: await fut finally: self._waiters.remove(fut) except exceptions.CancelledError: ifnot fut.cancelled(): self._value += 1 self._wake_up_next() raise
defrelease(self): """Release a semaphore, incrementing the internal counter by one. When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine. """ self._value += 1 self._wake_up_next()
def_wake_up_next(self): """Wake up the first waiter that isn't done.""" ifnotself._waiters: return
for fut inself._waiters: ifnot fut.done(): self._value -= 1 fut.set_result(True) return
class_ContextManagerMixin: # 异步的上下文管理器 asyncdef__aenter__(self): awaitself.acquire() # 如果锁住了就await # We have no use for the "as ..." clause in the with # statement for locks. returnNone