Add python-eventlet package to MOS 8.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / tests / __init__.py
similarity index 82%
rename from eventlet/tests/__init__.py
rename to python-eventlet/tests/__init__.py
index 3dba2420dea1f4787d243bf0e37b2e84fa7d32ad..26c0c2ec5ab49c5102314dcb7991e9dece9b0bf1 100644 (file)
@@ -1,8 +1,11 @@
 # package is named tests, not test, so it won't be confused with test in stdlib
 from __future__ import print_function
 
+import contextlib
 import errno
+import functools
 import gc
+import json
 import os
 try:
     import resource
@@ -14,6 +17,8 @@ import sys
 import unittest
 import warnings
 
+from nose.plugins.skip import SkipTest
+
 import eventlet
 from eventlet import tpool
 
@@ -22,22 +27,29 @@ from eventlet import tpool
 main = unittest.main
 
 
-def skipped(func):
-    """ Decorator that marks a function as skipped.  Uses nose's SkipTest exception
-    if installed.  Without nose, this will count skipped tests as passing tests."""
+@contextlib.contextmanager
+def assert_raises(exc_type):
     try:
-        from nose.plugins.skip import SkipTest
+        yield
+    except exc_type:
+        pass
+    else:
+        name = str(exc_type)
+        try:
+            name = exc_type.__name__
+        except AttributeError:
+            pass
+        assert False, 'Expected exception {0}'.format(name)
 
-        def skipme(*a, **k):
-            raise SkipTest()
-        skipme.__name__ = func.__name__
-        return skipme
-    except ImportError:
-        # no nose, we'll just skip the test ourselves
-        def skipme(*a, **k):
-            print(("Skipping {0}".format(func.__name__)))
-        skipme.__name__ = func.__name__
-        return skipme
+
+def skipped(func, *decorator_args):
+    """Decorator that marks a function as skipped.
+    """
+    @functools.wraps(func)
+    def wrapped(*a, **k):
+        raise SkipTest(*decorator_args)
+
+    return wrapped
 
 
 def skip_if(condition):
@@ -47,16 +59,16 @@ def skip_if(condition):
     should return True to skip the test.
     """
     def skipped_wrapper(func):
+        @functools.wraps(func)
         def wrapped(*a, **kw):
             if isinstance(condition, bool):
                 result = condition
             else:
                 result = condition(func)
             if result:
-                return skipped(func)(*a, **kw)
+                raise SkipTest()
             else:
                 return func(*a, **kw)
-        wrapped.__name__ = func.__name__
         return wrapped
     return skipped_wrapper
 
@@ -68,16 +80,16 @@ def skip_unless(condition):
     should return True if the condition is satisfied.
     """
     def skipped_wrapper(func):
+        @functools.wraps(func)
         def wrapped(*a, **kw):
             if isinstance(condition, bool):
                 result = condition
             else:
                 result = condition(func)
             if not result:
-                return skipped(func)(*a, **kw)
+                raise SkipTest()
             else:
                 return func(*a, **kw)
-        wrapped.__name__ = func.__name__
         return wrapped
     return skipped_wrapper
 
@@ -204,13 +216,23 @@ def check_idle_cpu_usage(duration, allowed_part):
 
 
 def verify_hub_empty():
+
+    def format_listener(listener):
+        return 'Listener %r for greenlet %r with run callback %r' % (
+            listener, listener.greenlet, getattr(listener.greenlet, 'run', None))
+
     from eventlet import hubs
     hub = hubs.get_hub()
-    num_readers = len(hub.get_readers())
-    num_writers = len(hub.get_writers())
+    readers = hub.get_readers()
+    writers = hub.get_writers()
+    num_readers = len(readers)
+    num_writers = len(writers)
     num_timers = hub.get_timers_count()
-    assert num_readers == 0 and num_writers == 0, "Readers: %s Writers: %s" % (
-        num_readers, num_writers)
+    assert num_readers == 0 and num_writers == 0, \
+        "Readers: %s (%d) Writers: %s (%d)" % (
+            ', '.join(map(format_listener, readers)), num_readers,
+            ', '.join(map(format_listener, writers)), num_writers,
+        )
 
 
 def find_command(command):
@@ -245,19 +267,10 @@ def get_database_auth():
     ".test_dbauth", which contains a json map of parameters to the
     connect function.
     """
-    import os
     retval = {
         'MySQLdb': {'host': 'localhost', 'user': 'root', 'passwd': ''},
         'psycopg2': {'user': 'test'},
     }
-    try:
-        import json
-    except ImportError:
-        try:
-            import simplejson as json
-        except ImportError:
-            print("No json implementation, using baked-in db credentials.")
-            return retval
 
     if 'EVENTLET_DB_TEST_AUTH' in os.environ:
         return json.loads(os.environ.get('EVENTLET_DB_TEST_AUTH'))
@@ -268,7 +281,7 @@ def get_database_auth():
         try:
             auth_utf8 = json.load(open(f))
             # Have to convert unicode objects to str objects because
-            # mysqldb is dum. Using a doubly-nested list comprehension
+            # mysqldb is dumb. Using a doubly-nested list comprehension
             # because we know that the structure is a two-level dict.
             return dict(
                 [(str(modname), dict(
@@ -283,9 +296,9 @@ def run_python(path):
     if not path.endswith('.py'):
         path += '.py'
     path = os.path.abspath(path)
-    dir_ = os.path.dirname(path)
+    src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
     new_env = os.environ.copy()
-    new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [dir_])
+    new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir])
     p = subprocess.Popen(
         [sys.executable, path],
         env=new_env,
@@ -297,5 +310,16 @@ def run_python(path):
     return output
 
 
+def run_isolated(path, prefix='tests/isolated/'):
+    output = run_python(prefix + path).rstrip()
+    if output.startswith(b'skip'):
+        parts = output.split(b':', 1)
+        skip_args = []
+        if len(parts) > 1:
+            skip_args.append(parts[1])
+        raise SkipTest(*skip_args)
+    assert output == b'pass', output
+
+
 certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
 private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')