Add python-eventlet package to MOS 8.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / eventlet / semaphore.py
1 from __future__ import with_statement
2
3 import collections
4
5 from eventlet import greenthread
6 from eventlet import hubs
7 from eventlet.timeout import Timeout
8
9
10 class Semaphore(object):
11
12     """An unbounded semaphore.
13     Optionally initialize with a resource *count*, then :meth:`acquire` and
14     :meth:`release` resources as needed. Attempting to :meth:`acquire` when
15     *count* is zero suspends the calling greenthread until *count* becomes
16     nonzero again.
17
18     This is API-compatible with :class:`threading.Semaphore`.
19
20     It is a context manager, and thus can be used in a with block::
21
22       sem = Semaphore(2)
23       with sem:
24         do_some_stuff()
25
26     If not specified, *value* defaults to 1.
27
28     It is possible to limit acquire time::
29
30       sem = Semaphore()
31       ok = sem.acquire(timeout=0.1)
32       # True if acquired, False if timed out.
33
34     """
35
36     def __init__(self, value=1):
37         self.counter = value
38         if value < 0:
39             raise ValueError("Semaphore must be initialized with a positive "
40                              "number, got %s" % value)
41         self._waiters = collections.deque()
42
43     def __repr__(self):
44         params = (self.__class__.__name__, hex(id(self)),
45                   self.counter, len(self._waiters))
46         return '<%s at %s c=%s _w[%s]>' % params
47
48     def __str__(self):
49         params = (self.__class__.__name__, self.counter, len(self._waiters))
50         return '<%s c=%s _w[%s]>' % params
51
52     def locked(self):
53         """Returns true if a call to acquire would block.
54         """
55         return self.counter <= 0
56
57     def bounded(self):
58         """Returns False; for consistency with
59         :class:`~eventlet.semaphore.CappedSemaphore`.
60         """
61         return False
62
63     def acquire(self, blocking=True, timeout=None):
64         """Acquire a semaphore.
65
66         When invoked without arguments: if the internal counter is larger than
67         zero on entry, decrement it by one and return immediately. If it is zero
68         on entry, block, waiting until some other thread has called release() to
69         make it larger than zero. This is done with proper interlocking so that
70         if multiple acquire() calls are blocked, release() will wake exactly one
71         of them up. The implementation may pick one at random, so the order in
72         which blocked threads are awakened should not be relied on. There is no
73         return value in this case.
74
75         When invoked with blocking set to true, do the same thing as when called
76         without arguments, and return true.
77
78         When invoked with blocking set to false, do not block. If a call without
79         an argument would block, return false immediately; otherwise, do the
80         same thing as when called without arguments, and return true.
81
82         Timeout value must be strictly positive.
83         """
84         if timeout == -1:
85             timeout = None
86         if timeout is not None and timeout < 0:
87             raise ValueError("timeout value must be strictly positive")
88         if not blocking:
89             if timeout is not None:
90                 raise ValueError("can't specify timeout for non-blocking acquire")
91             timeout = 0
92         if not blocking and self.locked():
93             return False
94
95         current_thread = greenthread.getcurrent()
96
97         if self.counter <= 0 or self._waiters:
98             if current_thread not in self._waiters:
99                 self._waiters.append(current_thread)
100             try:
101                 if timeout is not None:
102                     ok = False
103                     with Timeout(timeout, False):
104                         while self.counter <= 0:
105                             hubs.get_hub().switch()
106                         ok = True
107                     if not ok:
108                         return False
109                 else:
110                     # If someone else is already in this wait loop, give them
111                     # a chance to get out.
112                     while True:
113                         hubs.get_hub().switch()
114                         if self.counter > 0:
115                             break
116             finally:
117                 try:
118                     self._waiters.remove(current_thread)
119                 except ValueError:
120                     # Fine if its already been dropped.
121                     pass
122
123         self.counter -= 1
124         return True
125
126     def __enter__(self):
127         self.acquire()
128
129     def release(self, blocking=True):
130         """Release a semaphore, incrementing the internal counter by one. When
131         it was zero on entry and another thread is waiting for it to become
132         larger than zero again, wake up that thread.
133
134         The *blocking* argument is for consistency with CappedSemaphore and is
135         ignored
136         """
137         self.counter += 1
138         if self._waiters:
139             hubs.get_hub().schedule_call_global(0, self._do_acquire)
140         return True
141
142     def _do_acquire(self):
143         if self._waiters and self.counter > 0:
144             waiter = self._waiters.popleft()
145             waiter.switch()
146
147     def __exit__(self, typ, val, tb):
148         self.release()
149
150     @property
151     def balance(self):
152         """An integer value that represents how many new calls to
153         :meth:`acquire` or :meth:`release` would be needed to get the counter to
154         0.  If it is positive, then its value is the number of acquires that can
155         happen before the next acquire would block.  If it is negative, it is
156         the negative of the number of releases that would be required in order
157         to make the counter 0 again (one more release would push the counter to
158         1 and unblock acquirers).  It takes into account how many greenthreads
159         are currently blocking in :meth:`acquire`.
160         """
161         # positive means there are free items
162         # zero means there are no free items but nobody has requested one
163         # negative means there are requests for items, but no items
164         return self.counter - len(self._waiters)
165
166
167 class BoundedSemaphore(Semaphore):
168
169     """A bounded semaphore checks to make sure its current value doesn't exceed
170     its initial value. If it does, ValueError is raised. In most situations
171     semaphores are used to guard resources with limited capacity. If the
172     semaphore is released too many times it's a sign of a bug. If not given,
173     *value* defaults to 1.
174     """
175
176     def __init__(self, value=1):
177         super(BoundedSemaphore, self).__init__(value)
178         self.original_counter = value
179
180     def release(self, blocking=True):
181         """Release a semaphore, incrementing the internal counter by one. If
182         the counter would exceed the initial value, raises ValueError.  When
183         it was zero on entry and another thread is waiting for it to become
184         larger than zero again, wake up that thread.
185
186         The *blocking* argument is for consistency with :class:`CappedSemaphore`
187         and is ignored
188         """
189         if self.counter >= self.original_counter:
190             raise ValueError("Semaphore released too many times")
191         return super(BoundedSemaphore, self).release(blocking)
192
193
194 class CappedSemaphore(object):
195
196     """A blockingly bounded semaphore.
197
198     Optionally initialize with a resource *count*, then :meth:`acquire` and
199     :meth:`release` resources as needed. Attempting to :meth:`acquire` when
200     *count* is zero suspends the calling greenthread until count becomes nonzero
201     again.  Attempting to :meth:`release` after *count* has reached *limit*
202     suspends the calling greenthread until *count* becomes less than *limit*
203     again.
204
205     This has the same API as :class:`threading.Semaphore`, though its
206     semantics and behavior differ subtly due to the upper limit on calls
207     to :meth:`release`.  It is **not** compatible with
208     :class:`threading.BoundedSemaphore` because it blocks when reaching *limit*
209     instead of raising a ValueError.
210
211     It is a context manager, and thus can be used in a with block::
212
213       sem = CappedSemaphore(2)
214       with sem:
215         do_some_stuff()
216     """
217
218     def __init__(self, count, limit):
219         if count < 0:
220             raise ValueError("CappedSemaphore must be initialized with a "
221                              "positive number, got %s" % count)
222         if count > limit:
223             # accidentally, this also catches the case when limit is None
224             raise ValueError("'count' cannot be more than 'limit'")
225         self.lower_bound = Semaphore(count)
226         self.upper_bound = Semaphore(limit - count)
227
228     def __repr__(self):
229         params = (self.__class__.__name__, hex(id(self)),
230                   self.balance, self.lower_bound, self.upper_bound)
231         return '<%s at %s b=%s l=%s u=%s>' % params
232
233     def __str__(self):
234         params = (self.__class__.__name__, self.balance,
235                   self.lower_bound, self.upper_bound)
236         return '<%s b=%s l=%s u=%s>' % params
237
238     def locked(self):
239         """Returns true if a call to acquire would block.
240         """
241         return self.lower_bound.locked()
242
243     def bounded(self):
244         """Returns true if a call to release would block.
245         """
246         return self.upper_bound.locked()
247
248     def acquire(self, blocking=True):
249         """Acquire a semaphore.
250
251         When invoked without arguments: if the internal counter is larger than
252         zero on entry, decrement it by one and return immediately. If it is zero
253         on entry, block, waiting until some other thread has called release() to
254         make it larger than zero. This is done with proper interlocking so that
255         if multiple acquire() calls are blocked, release() will wake exactly one
256         of them up. The implementation may pick one at random, so the order in
257         which blocked threads are awakened should not be relied on. There is no
258         return value in this case.
259
260         When invoked with blocking set to true, do the same thing as when called
261         without arguments, and return true.
262
263         When invoked with blocking set to false, do not block. If a call without
264         an argument would block, return false immediately; otherwise, do the
265         same thing as when called without arguments, and return true.
266         """
267         if not blocking and self.locked():
268             return False
269         self.upper_bound.release()
270         try:
271             return self.lower_bound.acquire()
272         except:
273             self.upper_bound.counter -= 1
274             # using counter directly means that it can be less than zero.
275             # however I certainly don't need to wait here and I don't seem to have
276             # a need to care about such inconsistency
277             raise
278
279     def __enter__(self):
280         self.acquire()
281
282     def release(self, blocking=True):
283         """Release a semaphore.  In this class, this behaves very much like
284         an :meth:`acquire` but in the opposite direction.
285
286         Imagine the docs of :meth:`acquire` here, but with every direction
287         reversed.  When calling this method, it will block if the internal
288         counter is greater than or equal to *limit*.
289         """
290         if not blocking and self.bounded():
291             return False
292         self.lower_bound.release()
293         try:
294             return self.upper_bound.acquire()
295         except:
296             self.lower_bound.counter -= 1
297             raise
298
299     def __exit__(self, typ, val, tb):
300         self.release()
301
302     @property
303     def balance(self):
304         """An integer value that represents how many new calls to
305         :meth:`acquire` or :meth:`release` would be needed to get the counter to
306         0.  If it is positive, then its value is the number of acquires that can
307         happen before the next acquire would block.  If it is negative, it is
308         the negative of the number of releases that would be required in order
309         to make the counter 0 again (one more release would push the counter to
310         1 and unblock acquirers).  It takes into account how many greenthreads
311         are currently blocking in :meth:`acquire` and :meth:`release`.
312         """
313         return self.lower_bound.balance - self.upper_bound.balance