Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / tests / __init__.py
1 # package is named tests, not test, so it won't be confused with test in stdlib
2 from __future__ import print_function
3
4 import errno
5 import gc
6 import os
7 try:
8     import resource
9 except ImportError:
10     resource = None
11 import signal
12 import subprocess
13 import sys
14 import unittest
15 import warnings
16
17 import eventlet
18 from eventlet import tpool
19
20
21 # convenience for importers
22 main = unittest.main
23
24
25 def skipped(func):
26     """ Decorator that marks a function as skipped.  Uses nose's SkipTest exception
27     if installed.  Without nose, this will count skipped tests as passing tests."""
28     try:
29         from nose.plugins.skip import SkipTest
30
31         def skipme(*a, **k):
32             raise SkipTest()
33         skipme.__name__ = func.__name__
34         return skipme
35     except ImportError:
36         # no nose, we'll just skip the test ourselves
37         def skipme(*a, **k):
38             print(("Skipping {0}".format(func.__name__)))
39         skipme.__name__ = func.__name__
40         return skipme
41
42
43 def skip_if(condition):
44     """ Decorator that skips a test if the *condition* evaluates True.
45     *condition* can be a boolean or a callable that accepts one argument.
46     The callable will be called with the function to be decorated, and
47     should return True to skip the test.
48     """
49     def skipped_wrapper(func):
50         def wrapped(*a, **kw):
51             if isinstance(condition, bool):
52                 result = condition
53             else:
54                 result = condition(func)
55             if result:
56                 return skipped(func)(*a, **kw)
57             else:
58                 return func(*a, **kw)
59         wrapped.__name__ = func.__name__
60         return wrapped
61     return skipped_wrapper
62
63
64 def skip_unless(condition):
65     """ Decorator that skips a test if the *condition* does not return True.
66     *condition* can be a boolean or a callable that accepts one argument.
67     The callable will be called with the  function to be decorated, and
68     should return True if the condition is satisfied.
69     """
70     def skipped_wrapper(func):
71         def wrapped(*a, **kw):
72             if isinstance(condition, bool):
73                 result = condition
74             else:
75                 result = condition(func)
76             if not result:
77                 return skipped(func)(*a, **kw)
78             else:
79                 return func(*a, **kw)
80         wrapped.__name__ = func.__name__
81         return wrapped
82     return skipped_wrapper
83
84
85 def requires_twisted(func):
86     """ Decorator that skips a test if Twisted is not present."""
87     def requirement(_f):
88         from eventlet.hubs import get_hub
89         try:
90             return 'Twisted' in type(get_hub()).__name__
91         except Exception:
92             return False
93     return skip_unless(requirement)(func)
94
95
96 def using_pyevent(_f):
97     from eventlet.hubs import get_hub
98     return 'pyevent' in type(get_hub()).__module__
99
100
101 def skip_with_pyevent(func):
102     """ Decorator that skips a test if we're using the pyevent hub."""
103     return skip_if(using_pyevent)(func)
104
105
106 def skip_on_windows(func):
107     """ Decorator that skips a test on Windows."""
108     return skip_if(sys.platform.startswith('win'))(func)
109
110
111 def skip_if_no_itimer(func):
112     """ Decorator that skips a test if the `itimer` module isn't found """
113     has_itimer = False
114     try:
115         import itimer
116         has_itimer = True
117     except ImportError:
118         pass
119     return skip_unless(has_itimer)(func)
120
121
122 def skip_if_no_ssl(func):
123     """ Decorator that skips a test if SSL is not available."""
124     try:
125         import eventlet.green.ssl
126         return func
127     except ImportError:
128         try:
129             import eventlet.green.OpenSSL
130             return func
131         except ImportError:
132             return skipped(func)
133
134
135 class TestIsTakingTooLong(Exception):
136     """ Custom exception class to be raised when a test's runtime exceeds a limit. """
137     pass
138
139
140 class LimitedTestCase(unittest.TestCase):
141     """ Unittest subclass that adds a timeout to all tests.  Subclasses must
142     be sure to call the LimitedTestCase setUp and tearDown methods.  The default
143     timeout is 1 second, change it by setting TEST_TIMEOUT to the desired
144     quantity."""
145
146     TEST_TIMEOUT = 1
147
148     def setUp(self):
149         self.previous_alarm = None
150         self.timer = eventlet.Timeout(self.TEST_TIMEOUT,
151                                       TestIsTakingTooLong(self.TEST_TIMEOUT))
152
153     def reset_timeout(self, new_timeout):
154         """Changes the timeout duration; only has effect during one test.
155         `new_timeout` can be int or float.
156         """
157         self.timer.cancel()
158         self.timer = eventlet.Timeout(new_timeout,
159                                       TestIsTakingTooLong(new_timeout))
160
161     def set_alarm(self, new_timeout):
162         """Call this in the beginning of your test if you expect busy loops.
163         Only has effect during one test.
164         `new_timeout` must be int.
165         """
166         def sig_alarm_handler(sig, frame):
167             # Could arm previous alarm but test is failed anyway
168             # seems to be no point in restoring previous state.
169             raise TestIsTakingTooLong(new_timeout)
170
171         self.previous_alarm = (
172             signal.signal(signal.SIGALRM, sig_alarm_handler),
173             signal.alarm(new_timeout),
174         )
175
176     def tearDown(self):
177         self.timer.cancel()
178         if self.previous_alarm:
179             signal.signal(signal.SIGALRM, self.previous_alarm[0])
180             signal.alarm(self.previous_alarm[1])
181
182         tpool.killall()
183         gc.collect()
184         eventlet.sleep(0)
185         verify_hub_empty()
186
187     def assert_less_than(self, a, b, msg=None):
188         msg = msg or "%s not less than %s" % (a, b)
189         assert a < b, msg
190
191     assertLessThan = assert_less_than
192
193     def assert_less_than_equal(self, a, b, msg=None):
194         msg = msg or "%s not less than or equal to %s" % (a, b)
195         assert a <= b, msg
196
197     assertLessThanEqual = assert_less_than_equal
198
199
200 def check_idle_cpu_usage(duration, allowed_part):
201     if resource is None:
202         # TODO: use https://code.google.com/p/psutil/
203         from nose.plugins.skip import SkipTest
204         raise SkipTest('CPU usage testing not supported (`import resource` failed)')
205
206     r1 = resource.getrusage(resource.RUSAGE_SELF)
207     eventlet.sleep(duration)
208     r2 = resource.getrusage(resource.RUSAGE_SELF)
209     utime = r2.ru_utime - r1.ru_utime
210     stime = r2.ru_stime - r1.ru_stime
211     assert utime + stime < duration * allowed_part, \
212         "CPU usage over limit: user %.0f%% sys %.0f%% allowed %.0f%%" % (
213             utime / duration * 100, stime / duration * 100,
214             allowed_part * 100)
215
216
217 def verify_hub_empty():
218     from eventlet import hubs
219     hub = hubs.get_hub()
220     num_readers = len(hub.get_readers())
221     num_writers = len(hub.get_writers())
222     num_timers = hub.get_timers_count()
223     assert num_readers == 0 and num_writers == 0, "Readers: %s Writers: %s" % (num_readers, num_writers)
224
225
226 def find_command(command):
227     for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
228         p = os.path.join(dir, command)
229         if os.access(p, os.X_OK):
230             return p
231     raise IOError(errno.ENOENT, 'Command not found: %r' % command)
232
233
234 def silence_warnings(func):
235     def wrapper(*args, **kw):
236         warnings.simplefilter('ignore', DeprecationWarning)
237         try:
238             return func(*args, **kw)
239         finally:
240             warnings.simplefilter('default', DeprecationWarning)
241     wrapper.__name__ = func.__name__
242     return wrapper
243
244
245 def get_database_auth():
246     """Retrieves a dict of connection parameters for connecting to test databases.
247
248     Authentication parameters are highly-machine specific, so
249     get_database_auth gets its information from either environment
250     variables or a config file.  The environment variable is
251     "EVENTLET_DB_TEST_AUTH" and it should contain a json object.  If
252     this environment variable is present, it's used and config files
253     are ignored.  If it's not present, it looks in the local directory
254     (tests) and in the user's home directory for a file named
255     ".test_dbauth", which contains a json map of parameters to the
256     connect function.
257     """
258     import os
259     retval = {
260         'MySQLdb': {'host': 'localhost', 'user': 'root', 'passwd': ''},
261         'psycopg2': {'user': 'test'},
262     }
263     try:
264         import json
265     except ImportError:
266         try:
267             import simplejson as json
268         except ImportError:
269             print("No json implementation, using baked-in db credentials.")
270             return retval
271
272     if 'EVENTLET_DB_TEST_AUTH' in os.environ:
273         return json.loads(os.environ.get('EVENTLET_DB_TEST_AUTH'))
274
275     files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'),
276              os.path.join(os.path.expanduser('~'), '.test_dbauth')]
277     for f in files:
278         try:
279             auth_utf8 = json.load(open(f))
280             # Have to convert unicode objects to str objects because
281             # mysqldb is dum. Using a doubly-nested list comprehension
282             # because we know that the structure is a two-level dict.
283             return dict(
284                 [(str(modname), dict(
285                     [(str(k), str(v)) for k, v in connectargs.items()]))
286                  for modname, connectargs in auth_utf8.items()])
287         except IOError:
288             pass
289     return retval
290
291
292 def run_python(path):
293     if not path.endswith('.py'):
294         path += '.py'
295     path = os.path.abspath(path)
296     dir_ = os.path.dirname(path)
297     new_env = os.environ.copy()
298     new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [dir_])
299     p = subprocess.Popen(
300         [sys.executable, path],
301         env=new_env,
302         stderr=subprocess.STDOUT,
303         stdin=subprocess.PIPE,
304         stdout=subprocess.PIPE,
305     )
306     output, _ = p.communicate()
307     return output
308
309
310 certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
311 private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')