Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / __init__.py
1 # package is named tests, not test, so it won't be confused with test in stdlib
2 from __future__ import print_function
3
4 import errno
5 import gc
6 import os
7 try:
8     import resource
9 except ImportError:
10     resource = None
11 import signal
12 import subprocess
13 import sys
14 import unittest
15 import warnings
16
17 import eventlet
18 from eventlet import tpool
19
20
21 # convenience for importers
22 main = unittest.main
23
24
25 def skipped(func):
26     """ Decorator that marks a function as skipped.  Uses nose's SkipTest exception
27     if installed.  Without nose, this will count skipped tests as passing tests."""
28     try:
29         from nose.plugins.skip import SkipTest
30
31         def skipme(*a, **k):
32             raise SkipTest()
33         skipme.__name__ = func.__name__
34         return skipme
35     except ImportError:
36         # no nose, we'll just skip the test ourselves
37         def skipme(*a, **k):
38             print(("Skipping {0}".format(func.__name__)))
39         skipme.__name__ = func.__name__
40         return skipme
41
42
43 def skip_if(condition):
44     """ Decorator that skips a test if the *condition* evaluates True.
45     *condition* can be a boolean or a callable that accepts one argument.
46     The callable will be called with the function to be decorated, and
47     should return True to skip the test.
48     """
49     def skipped_wrapper(func):
50         def wrapped(*a, **kw):
51             if isinstance(condition, bool):
52                 result = condition
53             else:
54                 result = condition(func)
55             if result:
56                 return skipped(func)(*a, **kw)
57             else:
58                 return func(*a, **kw)
59         wrapped.__name__ = func.__name__
60         return wrapped
61     return skipped_wrapper
62
63
64 def skip_unless(condition):
65     """ Decorator that skips a test if the *condition* does not return True.
66     *condition* can be a boolean or a callable that accepts one argument.
67     The callable will be called with the  function to be decorated, and
68     should return True if the condition is satisfied.
69     """
70     def skipped_wrapper(func):
71         def wrapped(*a, **kw):
72             if isinstance(condition, bool):
73                 result = condition
74             else:
75                 result = condition(func)
76             if not result:
77                 return skipped(func)(*a, **kw)
78             else:
79                 return func(*a, **kw)
80         wrapped.__name__ = func.__name__
81         return wrapped
82     return skipped_wrapper
83
84
85 def using_pyevent(_f):
86     from eventlet.hubs import get_hub
87     return 'pyevent' in type(get_hub()).__module__
88
89
90 def skip_with_pyevent(func):
91     """ Decorator that skips a test if we're using the pyevent hub."""
92     return skip_if(using_pyevent)(func)
93
94
95 def skip_on_windows(func):
96     """ Decorator that skips a test on Windows."""
97     return skip_if(sys.platform.startswith('win'))(func)
98
99
100 def skip_if_no_itimer(func):
101     """ Decorator that skips a test if the `itimer` module isn't found """
102     has_itimer = False
103     try:
104         import itimer
105         has_itimer = True
106     except ImportError:
107         pass
108     return skip_unless(has_itimer)(func)
109
110
111 def skip_if_no_ssl(func):
112     """ Decorator that skips a test if SSL is not available."""
113     try:
114         import eventlet.green.ssl
115         return func
116     except ImportError:
117         try:
118             import eventlet.green.OpenSSL
119             return func
120         except ImportError:
121             return skipped(func)
122
123
124 class TestIsTakingTooLong(Exception):
125     """ Custom exception class to be raised when a test's runtime exceeds a limit. """
126     pass
127
128
129 class LimitedTestCase(unittest.TestCase):
130     """ Unittest subclass that adds a timeout to all tests.  Subclasses must
131     be sure to call the LimitedTestCase setUp and tearDown methods.  The default
132     timeout is 1 second, change it by setting TEST_TIMEOUT to the desired
133     quantity."""
134
135     TEST_TIMEOUT = 1
136
137     def setUp(self):
138         self.previous_alarm = None
139         self.timer = eventlet.Timeout(self.TEST_TIMEOUT,
140                                       TestIsTakingTooLong(self.TEST_TIMEOUT))
141
142     def reset_timeout(self, new_timeout):
143         """Changes the timeout duration; only has effect during one test.
144         `new_timeout` can be int or float.
145         """
146         self.timer.cancel()
147         self.timer = eventlet.Timeout(new_timeout,
148                                       TestIsTakingTooLong(new_timeout))
149
150     def set_alarm(self, new_timeout):
151         """Call this in the beginning of your test if you expect busy loops.
152         Only has effect during one test.
153         `new_timeout` must be int.
154         """
155         def sig_alarm_handler(sig, frame):
156             # Could arm previous alarm but test is failed anyway
157             # seems to be no point in restoring previous state.
158             raise TestIsTakingTooLong(new_timeout)
159
160         self.previous_alarm = (
161             signal.signal(signal.SIGALRM, sig_alarm_handler),
162             signal.alarm(new_timeout),
163         )
164
165     def tearDown(self):
166         self.timer.cancel()
167         if self.previous_alarm:
168             signal.signal(signal.SIGALRM, self.previous_alarm[0])
169             signal.alarm(self.previous_alarm[1])
170
171         tpool.killall()
172         gc.collect()
173         eventlet.sleep(0)
174         verify_hub_empty()
175
176     def assert_less_than(self, a, b, msg=None):
177         msg = msg or "%s not less than %s" % (a, b)
178         assert a < b, msg
179
180     assertLessThan = assert_less_than
181
182     def assert_less_than_equal(self, a, b, msg=None):
183         msg = msg or "%s not less than or equal to %s" % (a, b)
184         assert a <= b, msg
185
186     assertLessThanEqual = assert_less_than_equal
187
188
189 def check_idle_cpu_usage(duration, allowed_part):
190     if resource is None:
191         # TODO: use https://code.google.com/p/psutil/
192         from nose.plugins.skip import SkipTest
193         raise SkipTest('CPU usage testing not supported (`import resource` failed)')
194
195     r1 = resource.getrusage(resource.RUSAGE_SELF)
196     eventlet.sleep(duration)
197     r2 = resource.getrusage(resource.RUSAGE_SELF)
198     utime = r2.ru_utime - r1.ru_utime
199     stime = r2.ru_stime - r1.ru_stime
200     assert utime + stime < duration * allowed_part, \
201         "CPU usage over limit: user %.0f%% sys %.0f%% allowed %.0f%%" % (
202             utime / duration * 100, stime / duration * 100,
203             allowed_part * 100)
204
205
206 def verify_hub_empty():
207     from eventlet import hubs
208     hub = hubs.get_hub()
209     num_readers = len(hub.get_readers())
210     num_writers = len(hub.get_writers())
211     num_timers = hub.get_timers_count()
212     assert num_readers == 0 and num_writers == 0, "Readers: %s Writers: %s" % (
213         num_readers, num_writers)
214
215
216 def find_command(command):
217     for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
218         p = os.path.join(dir, command)
219         if os.access(p, os.X_OK):
220             return p
221     raise IOError(errno.ENOENT, 'Command not found: %r' % command)
222
223
224 def silence_warnings(func):
225     def wrapper(*args, **kw):
226         warnings.simplefilter('ignore', DeprecationWarning)
227         try:
228             return func(*args, **kw)
229         finally:
230             warnings.simplefilter('default', DeprecationWarning)
231     wrapper.__name__ = func.__name__
232     return wrapper
233
234
235 def get_database_auth():
236     """Retrieves a dict of connection parameters for connecting to test databases.
237
238     Authentication parameters are highly-machine specific, so
239     get_database_auth gets its information from either environment
240     variables or a config file.  The environment variable is
241     "EVENTLET_DB_TEST_AUTH" and it should contain a json object.  If
242     this environment variable is present, it's used and config files
243     are ignored.  If it's not present, it looks in the local directory
244     (tests) and in the user's home directory for a file named
245     ".test_dbauth", which contains a json map of parameters to the
246     connect function.
247     """
248     import os
249     retval = {
250         'MySQLdb': {'host': 'localhost', 'user': 'root', 'passwd': ''},
251         'psycopg2': {'user': 'test'},
252     }
253     try:
254         import json
255     except ImportError:
256         try:
257             import simplejson as json
258         except ImportError:
259             print("No json implementation, using baked-in db credentials.")
260             return retval
261
262     if 'EVENTLET_DB_TEST_AUTH' in os.environ:
263         return json.loads(os.environ.get('EVENTLET_DB_TEST_AUTH'))
264
265     files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'),
266              os.path.join(os.path.expanduser('~'), '.test_dbauth')]
267     for f in files:
268         try:
269             auth_utf8 = json.load(open(f))
270             # Have to convert unicode objects to str objects because
271             # mysqldb is dum. Using a doubly-nested list comprehension
272             # because we know that the structure is a two-level dict.
273             return dict(
274                 [(str(modname), dict(
275                     [(str(k), str(v)) for k, v in connectargs.items()]))
276                  for modname, connectargs in auth_utf8.items()])
277         except IOError:
278             pass
279     return retval
280
281
282 def run_python(path):
283     if not path.endswith('.py'):
284         path += '.py'
285     path = os.path.abspath(path)
286     dir_ = os.path.dirname(path)
287     new_env = os.environ.copy()
288     new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [dir_])
289     p = subprocess.Popen(
290         [sys.executable, path],
291         env=new_env,
292         stderr=subprocess.STDOUT,
293         stdin=subprocess.PIPE,
294         stdout=subprocess.PIPE,
295     )
296     output, _ = p.communicate()
297     return output
298
299
300 certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
301 private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')