--- /dev/null
+from __future__ import with_statement
+from eventlet import greenthread
+from eventlet import hubs
+from eventlet.timeout import Timeout
+
+
+class Semaphore(object):
+
+ """An unbounded semaphore.
+ Optionally initialize with a resource *count*, then :meth:`acquire` and
+ :meth:`release` resources as needed. Attempting to :meth:`acquire` when
+ *count* is zero suspends the calling greenthread until *count* becomes
+ nonzero again.
+
+ This is API-compatible with :class:`threading.Semaphore`.
+
+ It is a context manager, and thus can be used in a with block::
+
+ sem = Semaphore(2)
+ with sem:
+ do_some_stuff()
+
+ If not specified, *value* defaults to 1.
+
+ It is possible to limit acquire time::
+
+ sem = Semaphore()
+ ok = sem.acquire(timeout=0.1)
+ # True if acquired, False if timed out.
+
+ """
+
+ def __init__(self, value=1):
+ self.counter = value
+ if value < 0:
+ raise ValueError("Semaphore must be initialized with a positive "
+ "number, got %s" % value)
+ self._waiters = set()
+
+ def __repr__(self):
+ params = (self.__class__.__name__, hex(id(self)),
+ self.counter, len(self._waiters))
+ return '<%s at %s c=%s _w[%s]>' % params
+
+ def __str__(self):
+ params = (self.__class__.__name__, self.counter, len(self._waiters))
+ return '<%s c=%s _w[%s]>' % params
+
+ def locked(self):
+ """Returns true if a call to acquire would block.
+ """
+ return self.counter <= 0
+
+ def bounded(self):
+ """Returns False; for consistency with
+ :class:`~eventlet.semaphore.CappedSemaphore`.
+ """
+ return False
+
+ def acquire(self, blocking=True, timeout=None):
+ """Acquire a semaphore.
+
+ When invoked without arguments: if the internal counter is larger than
+ zero on entry, decrement it by one and return immediately. If it is zero
+ on entry, block, waiting until some other thread has called release() to
+ make it larger than zero. This is done with proper interlocking so that
+ if multiple acquire() calls are blocked, release() will wake exactly one
+ of them up. The implementation may pick one at random, so the order in
+ which blocked threads are awakened should not be relied on. There is no
+ return value in this case.
+
+ When invoked with blocking set to true, do the same thing as when called
+ without arguments, and return true.
+
+ When invoked with blocking set to false, do not block. If a call without
+ an argument would block, return false immediately; otherwise, do the
+ same thing as when called without arguments, and return true.
+ """
+ if not blocking and timeout is not None:
+ raise ValueError("can't specify timeout for non-blocking acquire")
+ if not blocking and self.locked():
+ return False
+ if self.counter <= 0:
+ self._waiters.add(greenthread.getcurrent())
+ try:
+ if timeout is not None:
+ ok = False
+ with Timeout(timeout, False):
+ while self.counter <= 0:
+ hubs.get_hub().switch()
+ ok = True
+ if not ok:
+ return False
+ else:
+ while self.counter <= 0:
+ hubs.get_hub().switch()
+ finally:
+ self._waiters.discard(greenthread.getcurrent())
+ self.counter -= 1
+ return True
+
+ def __enter__(self):
+ self.acquire()
+
+ def release(self, blocking=True):
+ """Release a semaphore, incrementing the internal counter by one. When
+ it was zero on entry and another thread is waiting for it to become
+ larger than zero again, wake up that thread.
+
+ The *blocking* argument is for consistency with CappedSemaphore and is
+ ignored
+ """
+ self.counter += 1
+ if self._waiters:
+ hubs.get_hub().schedule_call_global(0, self._do_acquire)
+ return True
+
+ def _do_acquire(self):
+ if self._waiters and self.counter > 0:
+ waiter = self._waiters.pop()
+ waiter.switch()
+
+ def __exit__(self, typ, val, tb):
+ self.release()
+
+ @property
+ def balance(self):
+ """An integer value that represents how many new calls to
+ :meth:`acquire` or :meth:`release` would be needed to get the counter to
+ 0. If it is positive, then its value is the number of acquires that can
+ happen before the next acquire would block. If it is negative, it is
+ the negative of the number of releases that would be required in order
+ to make the counter 0 again (one more release would push the counter to
+ 1 and unblock acquirers). It takes into account how many greenthreads
+ are currently blocking in :meth:`acquire`.
+ """
+ # positive means there are free items
+ # zero means there are no free items but nobody has requested one
+ # negative means there are requests for items, but no items
+ return self.counter - len(self._waiters)
+
+
+class BoundedSemaphore(Semaphore):
+
+ """A bounded semaphore checks to make sure its current value doesn't exceed
+ its initial value. If it does, ValueError is raised. In most situations
+ semaphores are used to guard resources with limited capacity. If the
+ semaphore is released too many times it's a sign of a bug. If not given,
+ *value* defaults to 1.
+ """
+
+ def __init__(self, value=1):
+ super(BoundedSemaphore, self).__init__(value)
+ self.original_counter = value
+
+ def release(self, blocking=True):
+ """Release a semaphore, incrementing the internal counter by one. If
+ the counter would exceed the initial value, raises ValueError. When
+ it was zero on entry and another thread is waiting for it to become
+ larger than zero again, wake up that thread.
+
+ The *blocking* argument is for consistency with :class:`CappedSemaphore`
+ and is ignored
+ """
+ if self.counter >= self.original_counter:
+ raise ValueError("Semaphore released too many times")
+ return super(BoundedSemaphore, self).release(blocking)
+
+
+class CappedSemaphore(object):
+
+ """A blockingly bounded semaphore.
+
+ Optionally initialize with a resource *count*, then :meth:`acquire` and
+ :meth:`release` resources as needed. Attempting to :meth:`acquire` when
+ *count* is zero suspends the calling greenthread until count becomes nonzero
+ again. Attempting to :meth:`release` after *count* has reached *limit*
+ suspends the calling greenthread until *count* becomes less than *limit*
+ again.
+
+ This has the same API as :class:`threading.Semaphore`, though its
+ semantics and behavior differ subtly due to the upper limit on calls
+ to :meth:`release`. It is **not** compatible with
+ :class:`threading.BoundedSemaphore` because it blocks when reaching *limit*
+ instead of raising a ValueError.
+
+ It is a context manager, and thus can be used in a with block::
+
+ sem = CappedSemaphore(2)
+ with sem:
+ do_some_stuff()
+ """
+
+ def __init__(self, count, limit):
+ if count < 0:
+ raise ValueError("CappedSemaphore must be initialized with a "
+ "positive number, got %s" % count)
+ if count > limit:
+ # accidentally, this also catches the case when limit is None
+ raise ValueError("'count' cannot be more than 'limit'")
+ self.lower_bound = Semaphore(count)
+ self.upper_bound = Semaphore(limit - count)
+
+ def __repr__(self):
+ params = (self.__class__.__name__, hex(id(self)),
+ self.balance, self.lower_bound, self.upper_bound)
+ return '<%s at %s b=%s l=%s u=%s>' % params
+
+ def __str__(self):
+ params = (self.__class__.__name__, self.balance,
+ self.lower_bound, self.upper_bound)
+ return '<%s b=%s l=%s u=%s>' % params
+
+ def locked(self):
+ """Returns true if a call to acquire would block.
+ """
+ return self.lower_bound.locked()
+
+ def bounded(self):
+ """Returns true if a call to release would block.
+ """
+ return self.upper_bound.locked()
+
+ def acquire(self, blocking=True):
+ """Acquire a semaphore.
+
+ When invoked without arguments: if the internal counter is larger than
+ zero on entry, decrement it by one and return immediately. If it is zero
+ on entry, block, waiting until some other thread has called release() to
+ make it larger than zero. This is done with proper interlocking so that
+ if multiple acquire() calls are blocked, release() will wake exactly one
+ of them up. The implementation may pick one at random, so the order in
+ which blocked threads are awakened should not be relied on. There is no
+ return value in this case.
+
+ When invoked with blocking set to true, do the same thing as when called
+ without arguments, and return true.
+
+ When invoked with blocking set to false, do not block. If a call without
+ an argument would block, return false immediately; otherwise, do the
+ same thing as when called without arguments, and return true.
+ """
+ if not blocking and self.locked():
+ return False
+ self.upper_bound.release()
+ try:
+ return self.lower_bound.acquire()
+ except:
+ self.upper_bound.counter -= 1
+ # using counter directly means that it can be less than zero.
+ # however I certainly don't need to wait here and I don't seem to have
+ # a need to care about such inconsistency
+ raise
+
+ def __enter__(self):
+ self.acquire()
+
+ def release(self, blocking=True):
+ """Release a semaphore. In this class, this behaves very much like
+ an :meth:`acquire` but in the opposite direction.
+
+ Imagine the docs of :meth:`acquire` here, but with every direction
+ reversed. When calling this method, it will block if the internal
+ counter is greater than or equal to *limit*.
+ """
+ if not blocking and self.bounded():
+ return False
+ self.lower_bound.release()
+ try:
+ return self.upper_bound.acquire()
+ except:
+ self.lower_bound.counter -= 1
+ raise
+
+ def __exit__(self, typ, val, tb):
+ self.release()
+
+ @property
+ def balance(self):
+ """An integer value that represents how many new calls to
+ :meth:`acquire` or :meth:`release` would be needed to get the counter to
+ 0. If it is positive, then its value is the number of acquires that can
+ happen before the next acquire would block. If it is negative, it is
+ the negative of the number of releases that would be required in order
+ to make the counter 0 again (one more release would push the counter to
+ 1 and unblock acquirers). It takes into account how many greenthreads
+ are currently blocking in :meth:`acquire` and :meth:`release`.
+ """
+ return self.lower_bound.balance - self.upper_bound.balance