Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / tests / __init__.py
1 # package is named tests, not test, so it won't be confused with test in stdlib
2 from __future__ import print_function
3
4 import contextlib
5 import errno
6 import functools
7 import gc
8 import json
9 import os
10 try:
11     import resource
12 except ImportError:
13     resource = None
14 import signal
15 import subprocess
16 import sys
17 import unittest
18 import warnings
19
20 from nose.plugins.skip import SkipTest
21
22 import eventlet
23 from eventlet import tpool
24
25
26 # convenience for importers
27 main = unittest.main
28
29
30 @contextlib.contextmanager
31 def assert_raises(exc_type):
32     try:
33         yield
34     except exc_type:
35         pass
36     else:
37         name = str(exc_type)
38         try:
39             name = exc_type.__name__
40         except AttributeError:
41             pass
42         assert False, 'Expected exception {0}'.format(name)
43
44
45 def skipped(func, *decorator_args):
46     """Decorator that marks a function as skipped.
47     """
48     @functools.wraps(func)
49     def wrapped(*a, **k):
50         raise SkipTest(*decorator_args)
51
52     return wrapped
53
54
55 def skip_if(condition):
56     """ Decorator that skips a test if the *condition* evaluates True.
57     *condition* can be a boolean or a callable that accepts one argument.
58     The callable will be called with the function to be decorated, and
59     should return True to skip the test.
60     """
61     def skipped_wrapper(func):
62         @functools.wraps(func)
63         def wrapped(*a, **kw):
64             if isinstance(condition, bool):
65                 result = condition
66             else:
67                 result = condition(func)
68             if result:
69                 raise SkipTest()
70             else:
71                 return func(*a, **kw)
72         return wrapped
73     return skipped_wrapper
74
75
76 def skip_unless(condition):
77     """ Decorator that skips a test if the *condition* does not return True.
78     *condition* can be a boolean or a callable that accepts one argument.
79     The callable will be called with the  function to be decorated, and
80     should return True if the condition is satisfied.
81     """
82     def skipped_wrapper(func):
83         @functools.wraps(func)
84         def wrapped(*a, **kw):
85             if isinstance(condition, bool):
86                 result = condition
87             else:
88                 result = condition(func)
89             if not result:
90                 raise SkipTest()
91             else:
92                 return func(*a, **kw)
93         return wrapped
94     return skipped_wrapper
95
96
97 def using_pyevent(_f):
98     from eventlet.hubs import get_hub
99     return 'pyevent' in type(get_hub()).__module__
100
101
102 def skip_with_pyevent(func):
103     """ Decorator that skips a test if we're using the pyevent hub."""
104     return skip_if(using_pyevent)(func)
105
106
107 def skip_on_windows(func):
108     """ Decorator that skips a test on Windows."""
109     return skip_if(sys.platform.startswith('win'))(func)
110
111
112 def skip_if_no_itimer(func):
113     """ Decorator that skips a test if the `itimer` module isn't found """
114     has_itimer = False
115     try:
116         import itimer
117         has_itimer = True
118     except ImportError:
119         pass
120     return skip_unless(has_itimer)(func)
121
122
123 def skip_if_no_ssl(func):
124     """ Decorator that skips a test if SSL is not available."""
125     try:
126         import eventlet.green.ssl
127         return func
128     except ImportError:
129         try:
130             import eventlet.green.OpenSSL
131             return func
132         except ImportError:
133             return skipped(func)
134
135
136 class TestIsTakingTooLong(Exception):
137     """ Custom exception class to be raised when a test's runtime exceeds a limit. """
138     pass
139
140
141 class LimitedTestCase(unittest.TestCase):
142     """ Unittest subclass that adds a timeout to all tests.  Subclasses must
143     be sure to call the LimitedTestCase setUp and tearDown methods.  The default
144     timeout is 1 second, change it by setting TEST_TIMEOUT to the desired
145     quantity."""
146
147     TEST_TIMEOUT = 1
148
149     def setUp(self):
150         self.previous_alarm = None
151         self.timer = eventlet.Timeout(self.TEST_TIMEOUT,
152                                       TestIsTakingTooLong(self.TEST_TIMEOUT))
153
154     def reset_timeout(self, new_timeout):
155         """Changes the timeout duration; only has effect during one test.
156         `new_timeout` can be int or float.
157         """
158         self.timer.cancel()
159         self.timer = eventlet.Timeout(new_timeout,
160                                       TestIsTakingTooLong(new_timeout))
161
162     def set_alarm(self, new_timeout):
163         """Call this in the beginning of your test if you expect busy loops.
164         Only has effect during one test.
165         `new_timeout` must be int.
166         """
167         def sig_alarm_handler(sig, frame):
168             # Could arm previous alarm but test is failed anyway
169             # seems to be no point in restoring previous state.
170             raise TestIsTakingTooLong(new_timeout)
171
172         self.previous_alarm = (
173             signal.signal(signal.SIGALRM, sig_alarm_handler),
174             signal.alarm(new_timeout),
175         )
176
177     def tearDown(self):
178         self.timer.cancel()
179         if self.previous_alarm:
180             signal.signal(signal.SIGALRM, self.previous_alarm[0])
181             signal.alarm(self.previous_alarm[1])
182
183         tpool.killall()
184         gc.collect()
185         eventlet.sleep(0)
186         verify_hub_empty()
187
188     def assert_less_than(self, a, b, msg=None):
189         msg = msg or "%s not less than %s" % (a, b)
190         assert a < b, msg
191
192     assertLessThan = assert_less_than
193
194     def assert_less_than_equal(self, a, b, msg=None):
195         msg = msg or "%s not less than or equal to %s" % (a, b)
196         assert a <= b, msg
197
198     assertLessThanEqual = assert_less_than_equal
199
200
201 def check_idle_cpu_usage(duration, allowed_part):
202     if resource is None:
203         # TODO: use https://code.google.com/p/psutil/
204         from nose.plugins.skip import SkipTest
205         raise SkipTest('CPU usage testing not supported (`import resource` failed)')
206
207     r1 = resource.getrusage(resource.RUSAGE_SELF)
208     eventlet.sleep(duration)
209     r2 = resource.getrusage(resource.RUSAGE_SELF)
210     utime = r2.ru_utime - r1.ru_utime
211     stime = r2.ru_stime - r1.ru_stime
212
213     # This check is reliably unreliable on Travis, presumably because of CPU
214     # resources being quite restricted by the build environment. The workaround
215     # is to apply an arbitrary factor that should be enough to make it work nicely.
216     if os.environ.get('TRAVIS') == 'true':
217         allowed_part *= 1.2
218
219     assert utime + stime < duration * allowed_part, \
220         "CPU usage over limit: user %.0f%% sys %.0f%% allowed %.0f%%" % (
221             utime / duration * 100, stime / duration * 100,
222             allowed_part * 100)
223
224
225 def verify_hub_empty():
226
227     def format_listener(listener):
228         return 'Listener %r for greenlet %r with run callback %r' % (
229             listener, listener.greenlet, getattr(listener.greenlet, 'run', None))
230
231     from eventlet import hubs
232     hub = hubs.get_hub()
233     readers = hub.get_readers()
234     writers = hub.get_writers()
235     num_readers = len(readers)
236     num_writers = len(writers)
237     num_timers = hub.get_timers_count()
238     assert num_readers == 0 and num_writers == 0, \
239         "Readers: %s (%d) Writers: %s (%d)" % (
240             ', '.join(map(format_listener, readers)), num_readers,
241             ', '.join(map(format_listener, writers)), num_writers,
242         )
243
244
245 def find_command(command):
246     for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
247         p = os.path.join(dir, command)
248         if os.access(p, os.X_OK):
249             return p
250     raise IOError(errno.ENOENT, 'Command not found: %r' % command)
251
252
253 def silence_warnings(func):
254     def wrapper(*args, **kw):
255         warnings.simplefilter('ignore', DeprecationWarning)
256         try:
257             return func(*args, **kw)
258         finally:
259             warnings.simplefilter('default', DeprecationWarning)
260     wrapper.__name__ = func.__name__
261     return wrapper
262
263
264 def get_database_auth():
265     """Retrieves a dict of connection parameters for connecting to test databases.
266
267     Authentication parameters are highly-machine specific, so
268     get_database_auth gets its information from either environment
269     variables or a config file.  The environment variable is
270     "EVENTLET_DB_TEST_AUTH" and it should contain a json object.  If
271     this environment variable is present, it's used and config files
272     are ignored.  If it's not present, it looks in the local directory
273     (tests) and in the user's home directory for a file named
274     ".test_dbauth", which contains a json map of parameters to the
275     connect function.
276     """
277     retval = {
278         'MySQLdb': {'host': 'localhost', 'user': 'root', 'passwd': ''},
279         'psycopg2': {'user': 'test'},
280     }
281
282     if 'EVENTLET_DB_TEST_AUTH' in os.environ:
283         return json.loads(os.environ.get('EVENTLET_DB_TEST_AUTH'))
284
285     files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'),
286              os.path.join(os.path.expanduser('~'), '.test_dbauth')]
287     for f in files:
288         try:
289             auth_utf8 = json.load(open(f))
290             # Have to convert unicode objects to str objects because
291             # mysqldb is dumb. Using a doubly-nested list comprehension
292             # because we know that the structure is a two-level dict.
293             return dict(
294                 [(str(modname), dict(
295                     [(str(k), str(v)) for k, v in connectargs.items()]))
296                  for modname, connectargs in auth_utf8.items()])
297         except IOError:
298             pass
299     return retval
300
301
302 def run_python(path, env=None, args=None):
303     new_argv = [sys.executable]
304     new_env = os.environ.copy()
305     if path:
306         if not path.endswith('.py'):
307             path += '.py'
308         path = os.path.abspath(path)
309         new_argv.append(path)
310         src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
311         new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir])
312     if env:
313         new_env.update(env)
314     if args:
315         new_argv.extend(args)
316     p = subprocess.Popen(
317         new_argv,
318         env=new_env,
319         stderr=subprocess.STDOUT,
320         stdin=subprocess.PIPE,
321         stdout=subprocess.PIPE,
322     )
323     output, _ = p.communicate()
324     return output
325
326
327 def run_isolated(path, prefix='tests/isolated/', env=None, args=None):
328     output = run_python(prefix + path, env=env, args=args).rstrip()
329     if output.startswith(b'skip'):
330         parts = output.split(b':', 1)
331         skip_args = []
332         if len(parts) > 1:
333             skip_args.append(parts[1])
334         raise SkipTest(*skip_args)
335     ok = output == b'pass'
336     if not ok:
337         sys.stderr.write('Isolated test {0} output:\n---\n{1}\n---\n'.format(path, output.decode()))
338     assert ok, 'Expected single line "pass" in stdout'
339
340
341 certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
342 private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')