--- /dev/null
+# package is named tests, not test, so it won't be confused with test in stdlib
+from __future__ import print_function
+
+import errno
+import gc
+import os
+try:
+ import resource
+except ImportError:
+ resource = None
+import signal
+import subprocess
+import sys
+import unittest
+import warnings
+
+import eventlet
+from eventlet import tpool
+
+
+# convenience for importers
+main = unittest.main
+
+
+def skipped(func):
+ """ Decorator that marks a function as skipped. Uses nose's SkipTest exception
+ if installed. Without nose, this will count skipped tests as passing tests."""
+ try:
+ from nose.plugins.skip import SkipTest
+
+ def skipme(*a, **k):
+ raise SkipTest()
+ skipme.__name__ = func.__name__
+ return skipme
+ except ImportError:
+ # no nose, we'll just skip the test ourselves
+ def skipme(*a, **k):
+ print(("Skipping {0}".format(func.__name__)))
+ skipme.__name__ = func.__name__
+ return skipme
+
+
+def skip_if(condition):
+ """ Decorator that skips a test if the *condition* evaluates True.
+ *condition* can be a boolean or a callable that accepts one argument.
+ The callable will be called with the function to be decorated, and
+ should return True to skip the test.
+ """
+ def skipped_wrapper(func):
+ def wrapped(*a, **kw):
+ if isinstance(condition, bool):
+ result = condition
+ else:
+ result = condition(func)
+ if result:
+ return skipped(func)(*a, **kw)
+ else:
+ return func(*a, **kw)
+ wrapped.__name__ = func.__name__
+ return wrapped
+ return skipped_wrapper
+
+
+def skip_unless(condition):
+ """ Decorator that skips a test if the *condition* does not return True.
+ *condition* can be a boolean or a callable that accepts one argument.
+ The callable will be called with the function to be decorated, and
+ should return True if the condition is satisfied.
+ """
+ def skipped_wrapper(func):
+ def wrapped(*a, **kw):
+ if isinstance(condition, bool):
+ result = condition
+ else:
+ result = condition(func)
+ if not result:
+ return skipped(func)(*a, **kw)
+ else:
+ return func(*a, **kw)
+ wrapped.__name__ = func.__name__
+ return wrapped
+ return skipped_wrapper
+
+
+def using_pyevent(_f):
+ from eventlet.hubs import get_hub
+ return 'pyevent' in type(get_hub()).__module__
+
+
+def skip_with_pyevent(func):
+ """ Decorator that skips a test if we're using the pyevent hub."""
+ return skip_if(using_pyevent)(func)
+
+
+def skip_on_windows(func):
+ """ Decorator that skips a test on Windows."""
+ return skip_if(sys.platform.startswith('win'))(func)
+
+
+def skip_if_no_itimer(func):
+ """ Decorator that skips a test if the `itimer` module isn't found """
+ has_itimer = False
+ try:
+ import itimer
+ has_itimer = True
+ except ImportError:
+ pass
+ return skip_unless(has_itimer)(func)
+
+
+def skip_if_no_ssl(func):
+ """ Decorator that skips a test if SSL is not available."""
+ try:
+ import eventlet.green.ssl
+ return func
+ except ImportError:
+ try:
+ import eventlet.green.OpenSSL
+ return func
+ except ImportError:
+ return skipped(func)
+
+
+class TestIsTakingTooLong(Exception):
+ """ Custom exception class to be raised when a test's runtime exceeds a limit. """
+ pass
+
+
+class LimitedTestCase(unittest.TestCase):
+ """ Unittest subclass that adds a timeout to all tests. Subclasses must
+ be sure to call the LimitedTestCase setUp and tearDown methods. The default
+ timeout is 1 second, change it by setting TEST_TIMEOUT to the desired
+ quantity."""
+
+ TEST_TIMEOUT = 1
+
+ def setUp(self):
+ self.previous_alarm = None
+ self.timer = eventlet.Timeout(self.TEST_TIMEOUT,
+ TestIsTakingTooLong(self.TEST_TIMEOUT))
+
+ def reset_timeout(self, new_timeout):
+ """Changes the timeout duration; only has effect during one test.
+ `new_timeout` can be int or float.
+ """
+ self.timer.cancel()
+ self.timer = eventlet.Timeout(new_timeout,
+ TestIsTakingTooLong(new_timeout))
+
+ def set_alarm(self, new_timeout):
+ """Call this in the beginning of your test if you expect busy loops.
+ Only has effect during one test.
+ `new_timeout` must be int.
+ """
+ def sig_alarm_handler(sig, frame):
+ # Could arm previous alarm but test is failed anyway
+ # seems to be no point in restoring previous state.
+ raise TestIsTakingTooLong(new_timeout)
+
+ self.previous_alarm = (
+ signal.signal(signal.SIGALRM, sig_alarm_handler),
+ signal.alarm(new_timeout),
+ )
+
+ def tearDown(self):
+ self.timer.cancel()
+ if self.previous_alarm:
+ signal.signal(signal.SIGALRM, self.previous_alarm[0])
+ signal.alarm(self.previous_alarm[1])
+
+ tpool.killall()
+ gc.collect()
+ eventlet.sleep(0)
+ verify_hub_empty()
+
+ def assert_less_than(self, a, b, msg=None):
+ msg = msg or "%s not less than %s" % (a, b)
+ assert a < b, msg
+
+ assertLessThan = assert_less_than
+
+ def assert_less_than_equal(self, a, b, msg=None):
+ msg = msg or "%s not less than or equal to %s" % (a, b)
+ assert a <= b, msg
+
+ assertLessThanEqual = assert_less_than_equal
+
+
+def check_idle_cpu_usage(duration, allowed_part):
+ if resource is None:
+ # TODO: use https://code.google.com/p/psutil/
+ from nose.plugins.skip import SkipTest
+ raise SkipTest('CPU usage testing not supported (`import resource` failed)')
+
+ r1 = resource.getrusage(resource.RUSAGE_SELF)
+ eventlet.sleep(duration)
+ r2 = resource.getrusage(resource.RUSAGE_SELF)
+ utime = r2.ru_utime - r1.ru_utime
+ stime = r2.ru_stime - r1.ru_stime
+ assert utime + stime < duration * allowed_part, \
+ "CPU usage over limit: user %.0f%% sys %.0f%% allowed %.0f%%" % (
+ utime / duration * 100, stime / duration * 100,
+ allowed_part * 100)
+
+
+def verify_hub_empty():
+ from eventlet import hubs
+ hub = hubs.get_hub()
+ num_readers = len(hub.get_readers())
+ num_writers = len(hub.get_writers())
+ num_timers = hub.get_timers_count()
+ assert num_readers == 0 and num_writers == 0, "Readers: %s Writers: %s" % (
+ num_readers, num_writers)
+
+
+def find_command(command):
+ for dir in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
+ p = os.path.join(dir, command)
+ if os.access(p, os.X_OK):
+ return p
+ raise IOError(errno.ENOENT, 'Command not found: %r' % command)
+
+
+def silence_warnings(func):
+ def wrapper(*args, **kw):
+ warnings.simplefilter('ignore', DeprecationWarning)
+ try:
+ return func(*args, **kw)
+ finally:
+ warnings.simplefilter('default', DeprecationWarning)
+ wrapper.__name__ = func.__name__
+ return wrapper
+
+
+def get_database_auth():
+ """Retrieves a dict of connection parameters for connecting to test databases.
+
+ Authentication parameters are highly-machine specific, so
+ get_database_auth gets its information from either environment
+ variables or a config file. The environment variable is
+ "EVENTLET_DB_TEST_AUTH" and it should contain a json object. If
+ this environment variable is present, it's used and config files
+ are ignored. If it's not present, it looks in the local directory
+ (tests) and in the user's home directory for a file named
+ ".test_dbauth", which contains a json map of parameters to the
+ connect function.
+ """
+ import os
+ retval = {
+ 'MySQLdb': {'host': 'localhost', 'user': 'root', 'passwd': ''},
+ 'psycopg2': {'user': 'test'},
+ }
+ try:
+ import json
+ except ImportError:
+ try:
+ import simplejson as json
+ except ImportError:
+ print("No json implementation, using baked-in db credentials.")
+ return retval
+
+ if 'EVENTLET_DB_TEST_AUTH' in os.environ:
+ return json.loads(os.environ.get('EVENTLET_DB_TEST_AUTH'))
+
+ files = [os.path.join(os.path.dirname(__file__), '.test_dbauth'),
+ os.path.join(os.path.expanduser('~'), '.test_dbauth')]
+ for f in files:
+ try:
+ auth_utf8 = json.load(open(f))
+ # Have to convert unicode objects to str objects because
+ # mysqldb is dum. Using a doubly-nested list comprehension
+ # because we know that the structure is a two-level dict.
+ return dict(
+ [(str(modname), dict(
+ [(str(k), str(v)) for k, v in connectargs.items()]))
+ for modname, connectargs in auth_utf8.items()])
+ except IOError:
+ pass
+ return retval
+
+
+def run_python(path):
+ if not path.endswith('.py'):
+ path += '.py'
+ path = os.path.abspath(path)
+ dir_ = os.path.dirname(path)
+ new_env = os.environ.copy()
+ new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [dir_])
+ p = subprocess.Popen(
+ [sys.executable, path],
+ env=new_env,
+ stderr=subprocess.STDOUT,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ )
+ output, _ = p.communicate()
+ return output
+
+
+certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
+private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')