1 from __future__ import with_statement
2 from eventlet import greenthread
3 from eventlet import hubs
4 from eventlet.timeout import Timeout
7 class Semaphore(object):
9 """An unbounded semaphore.
10 Optionally initialize with a resource *count*, then :meth:`acquire` and
11 :meth:`release` resources as needed. Attempting to :meth:`acquire` when
12 *count* is zero suspends the calling greenthread until *count* becomes
15 This is API-compatible with :class:`threading.Semaphore`.
17 It is a context manager, and thus can be used in a with block::
23 If not specified, *value* defaults to 1.
25 It is possible to limit acquire time::
28 ok = sem.acquire(timeout=0.1)
29 # True if acquired, False if timed out.
33 def __init__(self, value=1):
36 raise ValueError("Semaphore must be initialized with a positive "
37 "number, got %s" % value)
41 params = (self.__class__.__name__, hex(id(self)),
42 self.counter, len(self._waiters))
43 return '<%s at %s c=%s _w[%s]>' % params
46 params = (self.__class__.__name__, self.counter, len(self._waiters))
47 return '<%s c=%s _w[%s]>' % params
50 """Returns true if a call to acquire would block.
52 return self.counter <= 0
55 """Returns False; for consistency with
56 :class:`~eventlet.semaphore.CappedSemaphore`.
60 def acquire(self, blocking=True, timeout=None):
61 """Acquire a semaphore.
63 When invoked without arguments: if the internal counter is larger than
64 zero on entry, decrement it by one and return immediately. If it is zero
65 on entry, block, waiting until some other thread has called release() to
66 make it larger than zero. This is done with proper interlocking so that
67 if multiple acquire() calls are blocked, release() will wake exactly one
68 of them up. The implementation may pick one at random, so the order in
69 which blocked threads are awakened should not be relied on. There is no
70 return value in this case.
72 When invoked with blocking set to true, do the same thing as when called
73 without arguments, and return true.
75 When invoked with blocking set to false, do not block. If a call without
76 an argument would block, return false immediately; otherwise, do the
77 same thing as when called without arguments, and return true.
79 if not blocking and timeout is not None:
80 raise ValueError("can't specify timeout for non-blocking acquire")
81 if not blocking and self.locked():
84 self._waiters.add(greenthread.getcurrent())
86 if timeout is not None:
88 with Timeout(timeout, False):
89 while self.counter <= 0:
90 hubs.get_hub().switch()
95 while self.counter <= 0:
96 hubs.get_hub().switch()
98 self._waiters.discard(greenthread.getcurrent())
105 def release(self, blocking=True):
106 """Release a semaphore, incrementing the internal counter by one. When
107 it was zero on entry and another thread is waiting for it to become
108 larger than zero again, wake up that thread.
110 The *blocking* argument is for consistency with CappedSemaphore and is
115 hubs.get_hub().schedule_call_global(0, self._do_acquire)
118 def _do_acquire(self):
119 if self._waiters and self.counter > 0:
120 waiter = self._waiters.pop()
123 def __exit__(self, typ, val, tb):
128 """An integer value that represents how many new calls to
129 :meth:`acquire` or :meth:`release` would be needed to get the counter to
130 0. If it is positive, then its value is the number of acquires that can
131 happen before the next acquire would block. If it is negative, it is
132 the negative of the number of releases that would be required in order
133 to make the counter 0 again (one more release would push the counter to
134 1 and unblock acquirers). It takes into account how many greenthreads
135 are currently blocking in :meth:`acquire`.
137 # positive means there are free items
138 # zero means there are no free items but nobody has requested one
139 # negative means there are requests for items, but no items
140 return self.counter - len(self._waiters)
143 class BoundedSemaphore(Semaphore):
145 """A bounded semaphore checks to make sure its current value doesn't exceed
146 its initial value. If it does, ValueError is raised. In most situations
147 semaphores are used to guard resources with limited capacity. If the
148 semaphore is released too many times it's a sign of a bug. If not given,
149 *value* defaults to 1.
152 def __init__(self, value=1):
153 super(BoundedSemaphore, self).__init__(value)
154 self.original_counter = value
156 def release(self, blocking=True):
157 """Release a semaphore, incrementing the internal counter by one. If
158 the counter would exceed the initial value, raises ValueError. When
159 it was zero on entry and another thread is waiting for it to become
160 larger than zero again, wake up that thread.
162 The *blocking* argument is for consistency with :class:`CappedSemaphore`
165 if self.counter >= self.original_counter:
166 raise ValueError("Semaphore released too many times")
167 return super(BoundedSemaphore, self).release(blocking)
170 class CappedSemaphore(object):
172 """A blockingly bounded semaphore.
174 Optionally initialize with a resource *count*, then :meth:`acquire` and
175 :meth:`release` resources as needed. Attempting to :meth:`acquire` when
176 *count* is zero suspends the calling greenthread until count becomes nonzero
177 again. Attempting to :meth:`release` after *count* has reached *limit*
178 suspends the calling greenthread until *count* becomes less than *limit*
181 This has the same API as :class:`threading.Semaphore`, though its
182 semantics and behavior differ subtly due to the upper limit on calls
183 to :meth:`release`. It is **not** compatible with
184 :class:`threading.BoundedSemaphore` because it blocks when reaching *limit*
185 instead of raising a ValueError.
187 It is a context manager, and thus can be used in a with block::
189 sem = CappedSemaphore(2)
194 def __init__(self, count, limit):
196 raise ValueError("CappedSemaphore must be initialized with a "
197 "positive number, got %s" % count)
199 # accidentally, this also catches the case when limit is None
200 raise ValueError("'count' cannot be more than 'limit'")
201 self.lower_bound = Semaphore(count)
202 self.upper_bound = Semaphore(limit - count)
205 params = (self.__class__.__name__, hex(id(self)),
206 self.balance, self.lower_bound, self.upper_bound)
207 return '<%s at %s b=%s l=%s u=%s>' % params
210 params = (self.__class__.__name__, self.balance,
211 self.lower_bound, self.upper_bound)
212 return '<%s b=%s l=%s u=%s>' % params
215 """Returns true if a call to acquire would block.
217 return self.lower_bound.locked()
220 """Returns true if a call to release would block.
222 return self.upper_bound.locked()
224 def acquire(self, blocking=True):
225 """Acquire a semaphore.
227 When invoked without arguments: if the internal counter is larger than
228 zero on entry, decrement it by one and return immediately. If it is zero
229 on entry, block, waiting until some other thread has called release() to
230 make it larger than zero. This is done with proper interlocking so that
231 if multiple acquire() calls are blocked, release() will wake exactly one
232 of them up. The implementation may pick one at random, so the order in
233 which blocked threads are awakened should not be relied on. There is no
234 return value in this case.
236 When invoked with blocking set to true, do the same thing as when called
237 without arguments, and return true.
239 When invoked with blocking set to false, do not block. If a call without
240 an argument would block, return false immediately; otherwise, do the
241 same thing as when called without arguments, and return true.
243 if not blocking and self.locked():
245 self.upper_bound.release()
247 return self.lower_bound.acquire()
249 self.upper_bound.counter -= 1
250 # using counter directly means that it can be less than zero.
251 # however I certainly don't need to wait here and I don't seem to have
252 # a need to care about such inconsistency
258 def release(self, blocking=True):
259 """Release a semaphore. In this class, this behaves very much like
260 an :meth:`acquire` but in the opposite direction.
262 Imagine the docs of :meth:`acquire` here, but with every direction
263 reversed. When calling this method, it will block if the internal
264 counter is greater than or equal to *limit*.
266 if not blocking and self.bounded():
268 self.lower_bound.release()
270 return self.upper_bound.acquire()
272 self.lower_bound.counter -= 1
275 def __exit__(self, typ, val, tb):
280 """An integer value that represents how many new calls to
281 :meth:`acquire` or :meth:`release` would be needed to get the counter to
282 0. If it is positive, then its value is the number of acquires that can
283 happen before the next acquire would block. If it is negative, it is
284 the negative of the number of releases that would be required in order
285 to make the counter 0 again (one more release would push the counter to
286 1 and unblock acquirers). It takes into account how many greenthreads
287 are currently blocking in :meth:`acquire` and :meth:`release`.
289 return self.lower_bound.balance - self.upper_bound.balance