+++ /dev/null
-from __future__ import with_statement
-from eventlet import greenthread
-from eventlet import hubs
-from eventlet.timeout import Timeout
-
-
-class Semaphore(object):
-
- """An unbounded semaphore.
- Optionally initialize with a resource *count*, then :meth:`acquire` and
- :meth:`release` resources as needed. Attempting to :meth:`acquire` when
- *count* is zero suspends the calling greenthread until *count* becomes
- nonzero again.
-
- This is API-compatible with :class:`threading.Semaphore`.
-
- It is a context manager, and thus can be used in a with block::
-
- sem = Semaphore(2)
- with sem:
- do_some_stuff()
-
- If not specified, *value* defaults to 1.
-
- It is possible to limit acquire time::
-
- sem = Semaphore()
- ok = sem.acquire(timeout=0.1)
- # True if acquired, False if timed out.
-
- """
-
- def __init__(self, value=1):
- self.counter = value
- if value < 0:
- raise ValueError("Semaphore must be initialized with a positive "
- "number, got %s" % value)
- self._waiters = set()
-
- def __repr__(self):
- params = (self.__class__.__name__, hex(id(self)),
- self.counter, len(self._waiters))
- return '<%s at %s c=%s _w[%s]>' % params
-
- def __str__(self):
- params = (self.__class__.__name__, self.counter, len(self._waiters))
- return '<%s c=%s _w[%s]>' % params
-
- def locked(self):
- """Returns true if a call to acquire would block.
- """
- return self.counter <= 0
-
- def bounded(self):
- """Returns False; for consistency with
- :class:`~eventlet.semaphore.CappedSemaphore`.
- """
- return False
-
- def acquire(self, blocking=True, timeout=None):
- """Acquire a semaphore.
-
- When invoked without arguments: if the internal counter is larger than
- zero on entry, decrement it by one and return immediately. If it is zero
- on entry, block, waiting until some other thread has called release() to
- make it larger than zero. This is done with proper interlocking so that
- if multiple acquire() calls are blocked, release() will wake exactly one
- of them up. The implementation may pick one at random, so the order in
- which blocked threads are awakened should not be relied on. There is no
- return value in this case.
-
- When invoked with blocking set to true, do the same thing as when called
- without arguments, and return true.
-
- When invoked with blocking set to false, do not block. If a call without
- an argument would block, return false immediately; otherwise, do the
- same thing as when called without arguments, and return true.
- """
- if not blocking and timeout is not None:
- raise ValueError("can't specify timeout for non-blocking acquire")
- if not blocking and self.locked():
- return False
- if self.counter <= 0:
- self._waiters.add(greenthread.getcurrent())
- try:
- if timeout is not None:
- ok = False
- with Timeout(timeout, False):
- while self.counter <= 0:
- hubs.get_hub().switch()
- ok = True
- if not ok:
- return False
- else:
- while self.counter <= 0:
- hubs.get_hub().switch()
- finally:
- self._waiters.discard(greenthread.getcurrent())
- self.counter -= 1
- return True
-
- def __enter__(self):
- self.acquire()
-
- def release(self, blocking=True):
- """Release a semaphore, incrementing the internal counter by one. When
- it was zero on entry and another thread is waiting for it to become
- larger than zero again, wake up that thread.
-
- The *blocking* argument is for consistency with CappedSemaphore and is
- ignored
- """
- self.counter += 1
- if self._waiters:
- hubs.get_hub().schedule_call_global(0, self._do_acquire)
- return True
-
- def _do_acquire(self):
- if self._waiters and self.counter > 0:
- waiter = self._waiters.pop()
- waiter.switch()
-
- def __exit__(self, typ, val, tb):
- self.release()
-
- @property
- def balance(self):
- """An integer value that represents how many new calls to
- :meth:`acquire` or :meth:`release` would be needed to get the counter to
- 0. If it is positive, then its value is the number of acquires that can
- happen before the next acquire would block. If it is negative, it is
- the negative of the number of releases that would be required in order
- to make the counter 0 again (one more release would push the counter to
- 1 and unblock acquirers). It takes into account how many greenthreads
- are currently blocking in :meth:`acquire`.
- """
- # positive means there are free items
- # zero means there are no free items but nobody has requested one
- # negative means there are requests for items, but no items
- return self.counter - len(self._waiters)
-
-
-class BoundedSemaphore(Semaphore):
-
- """A bounded semaphore checks to make sure its current value doesn't exceed
- its initial value. If it does, ValueError is raised. In most situations
- semaphores are used to guard resources with limited capacity. If the
- semaphore is released too many times it's a sign of a bug. If not given,
- *value* defaults to 1.
- """
-
- def __init__(self, value=1):
- super(BoundedSemaphore, self).__init__(value)
- self.original_counter = value
-
- def release(self, blocking=True):
- """Release a semaphore, incrementing the internal counter by one. If
- the counter would exceed the initial value, raises ValueError. When
- it was zero on entry and another thread is waiting for it to become
- larger than zero again, wake up that thread.
-
- The *blocking* argument is for consistency with :class:`CappedSemaphore`
- and is ignored
- """
- if self.counter >= self.original_counter:
- raise ValueError("Semaphore released too many times")
- return super(BoundedSemaphore, self).release(blocking)
-
-
-class CappedSemaphore(object):
-
- """A blockingly bounded semaphore.
-
- Optionally initialize with a resource *count*, then :meth:`acquire` and
- :meth:`release` resources as needed. Attempting to :meth:`acquire` when
- *count* is zero suspends the calling greenthread until count becomes nonzero
- again. Attempting to :meth:`release` after *count* has reached *limit*
- suspends the calling greenthread until *count* becomes less than *limit*
- again.
-
- This has the same API as :class:`threading.Semaphore`, though its
- semantics and behavior differ subtly due to the upper limit on calls
- to :meth:`release`. It is **not** compatible with
- :class:`threading.BoundedSemaphore` because it blocks when reaching *limit*
- instead of raising a ValueError.
-
- It is a context manager, and thus can be used in a with block::
-
- sem = CappedSemaphore(2)
- with sem:
- do_some_stuff()
- """
-
- def __init__(self, count, limit):
- if count < 0:
- raise ValueError("CappedSemaphore must be initialized with a "
- "positive number, got %s" % count)
- if count > limit:
- # accidentally, this also catches the case when limit is None
- raise ValueError("'count' cannot be more than 'limit'")
- self.lower_bound = Semaphore(count)
- self.upper_bound = Semaphore(limit - count)
-
- def __repr__(self):
- params = (self.__class__.__name__, hex(id(self)),
- self.balance, self.lower_bound, self.upper_bound)
- return '<%s at %s b=%s l=%s u=%s>' % params
-
- def __str__(self):
- params = (self.__class__.__name__, self.balance,
- self.lower_bound, self.upper_bound)
- return '<%s b=%s l=%s u=%s>' % params
-
- def locked(self):
- """Returns true if a call to acquire would block.
- """
- return self.lower_bound.locked()
-
- def bounded(self):
- """Returns true if a call to release would block.
- """
- return self.upper_bound.locked()
-
- def acquire(self, blocking=True):
- """Acquire a semaphore.
-
- When invoked without arguments: if the internal counter is larger than
- zero on entry, decrement it by one and return immediately. If it is zero
- on entry, block, waiting until some other thread has called release() to
- make it larger than zero. This is done with proper interlocking so that
- if multiple acquire() calls are blocked, release() will wake exactly one
- of them up. The implementation may pick one at random, so the order in
- which blocked threads are awakened should not be relied on. There is no
- return value in this case.
-
- When invoked with blocking set to true, do the same thing as when called
- without arguments, and return true.
-
- When invoked with blocking set to false, do not block. If a call without
- an argument would block, return false immediately; otherwise, do the
- same thing as when called without arguments, and return true.
- """
- if not blocking and self.locked():
- return False
- self.upper_bound.release()
- try:
- return self.lower_bound.acquire()
- except:
- self.upper_bound.counter -= 1
- # using counter directly means that it can be less than zero.
- # however I certainly don't need to wait here and I don't seem to have
- # a need to care about such inconsistency
- raise
-
- def __enter__(self):
- self.acquire()
-
- def release(self, blocking=True):
- """Release a semaphore. In this class, this behaves very much like
- an :meth:`acquire` but in the opposite direction.
-
- Imagine the docs of :meth:`acquire` here, but with every direction
- reversed. When calling this method, it will block if the internal
- counter is greater than or equal to *limit*.
- """
- if not blocking and self.bounded():
- return False
- self.lower_bound.release()
- try:
- return self.upper_bound.acquire()
- except:
- self.lower_bound.counter -= 1
- raise
-
- def __exit__(self, typ, val, tb):
- self.release()
-
- @property
- def balance(self):
- """An integer value that represents how many new calls to
- :meth:`acquire` or :meth:`release` would be needed to get the counter to
- 0. If it is positive, then its value is the number of acquires that can
- happen before the next acquire would block. If it is negative, it is
- the negative of the number of releases that would be required in order
- to make the counter 0 again (one more release would push the counter to
- 1 and unblock acquirers). It takes into account how many greenthreads
- are currently blocking in :meth:`acquire` and :meth:`release`.
- """
- return self.lower_bound.balance - self.upper_bound.balance