1 from __future__ import with_statement
5 from eventlet import greenthread
6 from eventlet import hubs
7 from eventlet.timeout import Timeout
10 class Semaphore(object):
12 """An unbounded semaphore.
13 Optionally initialize with a resource *count*, then :meth:`acquire` and
14 :meth:`release` resources as needed. Attempting to :meth:`acquire` when
15 *count* is zero suspends the calling greenthread until *count* becomes
18 This is API-compatible with :class:`threading.Semaphore`.
20 It is a context manager, and thus can be used in a with block::
26 If not specified, *value* defaults to 1.
28 It is possible to limit acquire time::
31 ok = sem.acquire(timeout=0.1)
32 # True if acquired, False if timed out.
36 def __init__(self, value=1):
39 raise ValueError("Semaphore must be initialized with a positive "
40 "number, got %s" % value)
41 self._waiters = collections.deque()
44 params = (self.__class__.__name__, hex(id(self)),
45 self.counter, len(self._waiters))
46 return '<%s at %s c=%s _w[%s]>' % params
49 params = (self.__class__.__name__, self.counter, len(self._waiters))
50 return '<%s c=%s _w[%s]>' % params
53 """Returns true if a call to acquire would block.
55 return self.counter <= 0
58 """Returns False; for consistency with
59 :class:`~eventlet.semaphore.CappedSemaphore`.
63 def acquire(self, blocking=True, timeout=None):
64 """Acquire a semaphore.
66 When invoked without arguments: if the internal counter is larger than
67 zero on entry, decrement it by one and return immediately. If it is zero
68 on entry, block, waiting until some other thread has called release() to
69 make it larger than zero. This is done with proper interlocking so that
70 if multiple acquire() calls are blocked, release() will wake exactly one
71 of them up. The implementation may pick one at random, so the order in
72 which blocked threads are awakened should not be relied on. There is no
73 return value in this case.
75 When invoked with blocking set to true, do the same thing as when called
76 without arguments, and return true.
78 When invoked with blocking set to false, do not block. If a call without
79 an argument would block, return false immediately; otherwise, do the
80 same thing as when called without arguments, and return true.
82 Timeout value must be strictly positive.
86 if timeout is not None and timeout < 0:
87 raise ValueError("timeout value must be strictly positive")
89 if timeout is not None:
90 raise ValueError("can't specify timeout for non-blocking acquire")
92 if not blocking and self.locked():
95 current_thread = greenthread.getcurrent()
97 if self.counter <= 0 or self._waiters:
98 if current_thread not in self._waiters:
99 self._waiters.append(current_thread)
101 if timeout is not None:
103 with Timeout(timeout, False):
104 while self.counter <= 0:
105 hubs.get_hub().switch()
110 # If someone else is already in this wait loop, give them
111 # a chance to get out.
113 hubs.get_hub().switch()
118 self._waiters.remove(current_thread)
120 # Fine if its already been dropped.
129 def release(self, blocking=True):
130 """Release a semaphore, incrementing the internal counter by one. When
131 it was zero on entry and another thread is waiting for it to become
132 larger than zero again, wake up that thread.
134 The *blocking* argument is for consistency with CappedSemaphore and is
139 hubs.get_hub().schedule_call_global(0, self._do_acquire)
142 def _do_acquire(self):
143 if self._waiters and self.counter > 0:
144 waiter = self._waiters.popleft()
147 def __exit__(self, typ, val, tb):
152 """An integer value that represents how many new calls to
153 :meth:`acquire` or :meth:`release` would be needed to get the counter to
154 0. If it is positive, then its value is the number of acquires that can
155 happen before the next acquire would block. If it is negative, it is
156 the negative of the number of releases that would be required in order
157 to make the counter 0 again (one more release would push the counter to
158 1 and unblock acquirers). It takes into account how many greenthreads
159 are currently blocking in :meth:`acquire`.
161 # positive means there are free items
162 # zero means there are no free items but nobody has requested one
163 # negative means there are requests for items, but no items
164 return self.counter - len(self._waiters)
167 class BoundedSemaphore(Semaphore):
169 """A bounded semaphore checks to make sure its current value doesn't exceed
170 its initial value. If it does, ValueError is raised. In most situations
171 semaphores are used to guard resources with limited capacity. If the
172 semaphore is released too many times it's a sign of a bug. If not given,
173 *value* defaults to 1.
176 def __init__(self, value=1):
177 super(BoundedSemaphore, self).__init__(value)
178 self.original_counter = value
180 def release(self, blocking=True):
181 """Release a semaphore, incrementing the internal counter by one. If
182 the counter would exceed the initial value, raises ValueError. When
183 it was zero on entry and another thread is waiting for it to become
184 larger than zero again, wake up that thread.
186 The *blocking* argument is for consistency with :class:`CappedSemaphore`
189 if self.counter >= self.original_counter:
190 raise ValueError("Semaphore released too many times")
191 return super(BoundedSemaphore, self).release(blocking)
194 class CappedSemaphore(object):
196 """A blockingly bounded semaphore.
198 Optionally initialize with a resource *count*, then :meth:`acquire` and
199 :meth:`release` resources as needed. Attempting to :meth:`acquire` when
200 *count* is zero suspends the calling greenthread until count becomes nonzero
201 again. Attempting to :meth:`release` after *count* has reached *limit*
202 suspends the calling greenthread until *count* becomes less than *limit*
205 This has the same API as :class:`threading.Semaphore`, though its
206 semantics and behavior differ subtly due to the upper limit on calls
207 to :meth:`release`. It is **not** compatible with
208 :class:`threading.BoundedSemaphore` because it blocks when reaching *limit*
209 instead of raising a ValueError.
211 It is a context manager, and thus can be used in a with block::
213 sem = CappedSemaphore(2)
218 def __init__(self, count, limit):
220 raise ValueError("CappedSemaphore must be initialized with a "
221 "positive number, got %s" % count)
223 # accidentally, this also catches the case when limit is None
224 raise ValueError("'count' cannot be more than 'limit'")
225 self.lower_bound = Semaphore(count)
226 self.upper_bound = Semaphore(limit - count)
229 params = (self.__class__.__name__, hex(id(self)),
230 self.balance, self.lower_bound, self.upper_bound)
231 return '<%s at %s b=%s l=%s u=%s>' % params
234 params = (self.__class__.__name__, self.balance,
235 self.lower_bound, self.upper_bound)
236 return '<%s b=%s l=%s u=%s>' % params
239 """Returns true if a call to acquire would block.
241 return self.lower_bound.locked()
244 """Returns true if a call to release would block.
246 return self.upper_bound.locked()
248 def acquire(self, blocking=True):
249 """Acquire a semaphore.
251 When invoked without arguments: if the internal counter is larger than
252 zero on entry, decrement it by one and return immediately. If it is zero
253 on entry, block, waiting until some other thread has called release() to
254 make it larger than zero. This is done with proper interlocking so that
255 if multiple acquire() calls are blocked, release() will wake exactly one
256 of them up. The implementation may pick one at random, so the order in
257 which blocked threads are awakened should not be relied on. There is no
258 return value in this case.
260 When invoked with blocking set to true, do the same thing as when called
261 without arguments, and return true.
263 When invoked with blocking set to false, do not block. If a call without
264 an argument would block, return false immediately; otherwise, do the
265 same thing as when called without arguments, and return true.
267 if not blocking and self.locked():
269 self.upper_bound.release()
271 return self.lower_bound.acquire()
273 self.upper_bound.counter -= 1
274 # using counter directly means that it can be less than zero.
275 # however I certainly don't need to wait here and I don't seem to have
276 # a need to care about such inconsistency
282 def release(self, blocking=True):
283 """Release a semaphore. In this class, this behaves very much like
284 an :meth:`acquire` but in the opposite direction.
286 Imagine the docs of :meth:`acquire` here, but with every direction
287 reversed. When calling this method, it will block if the internal
288 counter is greater than or equal to *limit*.
290 if not blocking and self.bounded():
292 self.lower_bound.release()
294 return self.upper_bound.acquire()
296 self.lower_bound.counter -= 1
299 def __exit__(self, typ, val, tb):
304 """An integer value that represents how many new calls to
305 :meth:`acquire` or :meth:`release` would be needed to get the counter to
306 0. If it is positive, then its value is the number of acquires that can
307 happen before the next acquire would block. If it is negative, it is
308 the negative of the number of releases that would be required in order
309 to make the counter 0 again (one more release would push the counter to
310 1 and unblock acquirers). It takes into account how many greenthreads
311 are currently blocking in :meth:`acquire` and :meth:`release`.
313 return self.lower_bound.balance - self.upper_bound.balance