26c0c2ec5ab49c5102314dcb7991e9dece9b0bf1
[packages/trusty/python-eventlet.git] / python-eventlet / tests / __init__.py
1 # package is named tests, not test, so it won't be confused with test in stdlib
2 from __future__ import print_function
3
4 import contextlib
5 import errno
6 import functools
7 import gc
8 import json
9 import os
10 try:
11     import resource
12 except ImportError:
13     resource = None
14 import signal
15 import subprocess
16 import sys
17 import unittest
18 import warnings
19
20 from nose.plugins.skip import SkipTest
21
22 import eventlet
23 from eventlet import tpool
24
25
26 # convenience for importers
27 main = unittest.main
28
29
30 @contextlib.contextmanager
31 def assert_raises(exc_type):
32     try:
33         yield
34     except exc_type:
35         pass
36     else:
37         name = str(exc_type)
38         try:
39             name = exc_type.__name__
40         except AttributeError:
41             pass
42         assert False, 'Expected exception {0}'.format(name)
43
44
45 def skipped(func, *decorator_args):
46     """Decorator that marks a function as skipped.
47     """
48     @functools.wraps(func)
49     def wrapped(*a, **k):
50         raise SkipTest(*decorator_args)
51
52     return wrapped
53
54
55 def skip_if(condition):
56     """ Decorator that skips a test if the *condition* evaluates True.
57     *condition* can be a boolean or a callable that accepts one argument.
58     The callable will be called with the function to be decorated, and
59     should return True to skip the test.
60     """
61     def skipped_wrapper(func):
62         @functools.wraps(func)
63         def wrapped(*a, **kw):
64             if isinstance(condition, bool):
65                 result = condition
66             else:
67                 result = condition(func)
68             if result:
69                 raise SkipTest()
70             else:
71                 return func(*a, **kw)
72         return wrapped
73     return skipped_wrapper
74
75
76 def skip_unless(condition):
77     """ Decorator that skips a test if the *condition* does not return True.
78     *condition* can be a boolean or a callable that accepts one argument.
79     The callable will be called with the  function to be decorated, and
80     should return True if the condition is satisfied.
81     """
82     def skipped_wrapper(func):
83         @functools.wraps(func)
84         def wrapped(*a, **kw):
85             if isinstance(condition, bool):
86                 result = condition
87             else:
88                 result = condition(func)
89             if not result:
90                 raise SkipTest()
91             else:
92                 return func(*a, **kw)
93         return wrapped
94     return skipped_wrapper
95
96
97 def using_pyevent(_f):
98     from eventlet.hubs import get_hub
99     return 'pyevent' in type(get_hub()).__module__
100
101
102 def skip_with_pyevent(func):
103     """ Decorator that skips a test if we're using the pyevent hub."""
104     return skip_if(using_pyevent)(func)
105
106
107 def skip_on_windows(func):
108     """ Decorator that skips a test on Windows."""
109     return skip_if(sys.platform.startswith('win'))(func)
110
111
112 def skip_if_no_itimer(func):
113     """ Decorator that skips a test if the `itimer` module isn't found """
114     has_itimer = False
115     try:
116         import itimer
117         has_itimer = True
118     except ImportError:
119         pass
120     return skip_unless(has_itimer)(func)
121
122
123 def skip_if_no_ssl(func):
124     """ Decorator that skips a test if SSL is not available."""
125     try:
126         import eventlet.green.ssl
127         return func
128     except ImportError:
129         try:
130             import eventlet.green.OpenSSL
131             return func
132         except ImportError:
133             return skipped(func)
134
135
136 class TestIsTakingTooLong(Exception):
137     """ Custom exception class to be raised when a test's runtime exceeds a limit. """
138     pass
139
140
141 class LimitedTestCase(unittest.TestCase):
142     """ Unittest subclass that adds a timeout to all tests.  Subclasses must
143     be sure to call the LimitedTestCase setUp and tearDown methods.  The default
144     timeout is 1 second, change it by setting TEST_TIMEOUT to the desired
145     quantity."""
146
147     TEST_TIMEOUT = 1
148
149     def setUp(self):
150         self.previous_alarm = None
151         self.timer = eventlet.Timeout(self.TEST_TIMEOUT,
152                                       TestIsTakingTooLong(self.TEST_TIMEOUT))
153
154     def reset_timeout(self, new_timeout):
155         """Changes the timeout duration; only has effect during one test.
156         `new_timeout` can be int or float.
157         """
158         self.timer.cancel()
159         self.timer = eventlet.Timeout(new_timeout,
160                                       TestIsTakingTooLong(new_timeout))
161
162     def set_alarm(self, new_timeout):
163         """Call this in the beginning of your test if you expect busy loops.
164         Only has effect during one test.
165         `new_timeout` must be int.
166         """
167         def sig_alarm_handler(sig, frame):
168             # Could arm previous alarm but test is failed anyway
169             # seems to be no point in restoring previous state.
170             raise TestIsTakingTooLong(new_timeout)
171
172         self.previous_alarm = (
173             signal.signal(signal.SIGALRM, sig_alarm_handler),
174             signal.alarm(new_timeout),
175         )
176
177     def tearDown(self):
178         self.timer.cancel()
179         if self.previous_alarm:
180             signal.signal(signal.SIGALRM, self.previous_alarm[0])
181             signal.alarm(self.previous_alarm[1])
182
183         tpool.killall()
184         gc.collect()
185         eventlet.sleep(0)
186         verify_hub_empty()
187
188     def assert_less_than(self, a, b, msg=None):
189         msg = msg or "%s not less than %s" % (a, b)
190         assert a < b, msg
191
192     assertLessThan = assert_less_than
193
194     def assert_less_than_equal(self, a, b, msg=None):
195         msg = msg or "%s not less than or equal to %s" % (a, b)
196         assert a <= b, msg
197
198     assertLessThanEqual = assert_less_than_equal
199
200
201 def check_idle_cpu_usage(duration, allowed_part):
202     if resource is None:
203         # TODO: use https://code.google.com/p/psutil/
204         from nose.plugins.skip import SkipTest
205         raise SkipTest('CPU usage testing not supported (`import resource` failed)')
206
207     r1 = resource.getrusage(resource.RUSAGE_SELF)
208     eventlet.sleep(duration)
209     r2 = resource.getrusage(resource.RUSAGE_SELF)
210     utime = r2.ru_utime - r1.ru_utime
211     stime = r2.ru_stime - r1.ru_stime
212     assert utime + stime < duration * allowed_part, \
213         "CPU usage over limit: user %.0f%% sys %.0f%% allowed %.0f%%" % (
214             utime / duration * 100, stime / duration * 100,
215             allowed_part * 100)
216
217
218 def verify_hub_empty():
219
220     def format_listener(listener):
221         return 'Listener %r for greenlet %r with run callback %r' % (
222             listener, listener.greenlet, getattr(listener.greenlet, 'run', None))
223
224     from eventlet import hubs
225     hub = hubs.get_hub()
226     readers = hub.get_readers()
227     writers = hub.get_writers()
228     num_readers = len(readers)
229     num_writers = len(writers)
230     num_timers = hub.get_timers_count()
231     assert num_readers == 0 and num_writers == 0, \
232         "Readers: %s (%d) Writers: %s (%d)" % (
233             ', '.join(map(format_listener, readers)), num_readers,
234             ', '.join(map(format_listener, writers)), num_writers,
235         )
236
237
238 def find_command(command):
239     for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
240         p = os.path.join(dir, command)
241         if os.access(p, os.X_OK):
242             return p
243     raise IOError(errno.ENOENT, 'Command not found: %r' % command)
244
245
246 def silence_warnings(func):
247     def wrapper(*args, **kw):
248         warnings.simplefilter('ignore', DeprecationWarning)
249         try:
250             return func(*args, **kw)
251         finally:
252             warnings.simplefilter('default', DeprecationWarning)
253     wrapper.__name__ = func.__name__
254     return wrapper
255
256
257 def get_database_auth():
258     """Retrieves a dict of connection parameters for connecting to test databases.
259
260     Authentication parameters are highly-machine specific, so
261     get_database_auth gets its information from either environment
262     variables or a config file.  The environment variable is
263     "EVENTLET_DB_TEST_AUTH" and it should contain a json object.  If
264     this environment variable is present, it's used and config files
265     are ignored.  If it's not present, it looks in the local directory
266     (tests) and in the user's home directory for a file named
267     ".test_dbauth", which contains a json map of parameters to the
268     connect function.
269     """
270     retval = {
271         'MySQLdb': {'host': 'localhost', 'user': 'root', 'passwd': ''},
272         'psycopg2': {'user': 'test'},
273     }
274
275     if 'EVENTLET_DB_TEST_AUTH' in os.environ:
276         return json.loads(os.environ.get('EVENTLET_DB_TEST_AUTH'))
277
278     files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'),
279              os.path.join(os.path.expanduser('~'), '.test_dbauth')]
280     for f in files:
281         try:
282             auth_utf8 = json.load(open(f))
283             # Have to convert unicode objects to str objects because
284             # mysqldb is dumb. Using a doubly-nested list comprehension
285             # because we know that the structure is a two-level dict.
286             return dict(
287                 [(str(modname), dict(
288                     [(str(k), str(v)) for k, v in connectargs.items()]))
289                  for modname, connectargs in auth_utf8.items()])
290         except IOError:
291             pass
292     return retval
293
294
295 def run_python(path):
296     if not path.endswith('.py'):
297         path += '.py'
298     path = os.path.abspath(path)
299     src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
300     new_env = os.environ.copy()
301     new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir])
302     p = subprocess.Popen(
303         [sys.executable, path],
304         env=new_env,
305         stderr=subprocess.STDOUT,
306         stdin=subprocess.PIPE,
307         stdout=subprocess.PIPE,
308     )
309     output, _ = p.communicate()
310     return output
311
312
313 def run_isolated(path, prefix='tests/isolated/'):
314     output = run_python(prefix + path).rstrip()
315     if output.startswith(b'skip'):
316         parts = output.split(b':', 1)
317         skip_args = []
318         if len(parts) > 1:
319             skip_args.append(parts[1])
320         raise SkipTest(*skip_args)
321     assert output == b'pass', output
322
323
324 certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
325 private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')