Add python-eventlet package to MOS 8.0 repository
[packages/trusty/python-eventlet.git] / eventlet / tests / greenthread_test.py
diff --git a/eventlet/tests/greenthread_test.py b/eventlet/tests/greenthread_test.py
deleted file mode 100644 (file)
index 39b29c7..0000000
+++ /dev/null
@@ -1,164 +0,0 @@
-from tests import LimitedTestCase
-from eventlet import greenthread
-from eventlet.support import greenlets as greenlet
-
-_g_results = []
-
-
-def passthru(*args, **kw):
-    _g_results.append((args, kw))
-    return args, kw
-
-
-def waiter(a):
-    greenthread.sleep(0.1)
-    return a
-
-
-class Asserts(object):
-    def assert_dead(self, gt):
-        if hasattr(gt, 'wait'):
-            self.assertRaises(greenlet.GreenletExit, gt.wait)
-        assert gt.dead
-        assert not gt
-
-
-class Spawn(LimitedTestCase, Asserts):
-    def tearDown(self):
-        global _g_results
-        super(Spawn, self).tearDown()
-        _g_results = []
-
-    def test_simple(self):
-        gt = greenthread.spawn(passthru, 1, b=2)
-        self.assertEqual(gt.wait(), ((1,), {'b': 2}))
-        self.assertEqual(_g_results, [((1,), {'b': 2})])
-
-    def test_n(self):
-        gt = greenthread.spawn_n(passthru, 2, b=3)
-        assert not gt.dead
-        greenthread.sleep(0)
-        assert gt.dead
-        self.assertEqual(_g_results, [((2,), {'b': 3})])
-
-    def test_kill(self):
-        gt = greenthread.spawn(passthru, 6)
-        greenthread.kill(gt)
-        self.assert_dead(gt)
-        greenthread.sleep(0.001)
-        self.assertEqual(_g_results, [])
-        greenthread.kill(gt)
-        self.assert_dead(gt)
-
-    def test_kill_meth(self):
-        gt = greenthread.spawn(passthru, 6)
-        gt.kill()
-        self.assert_dead(gt)
-        greenthread.sleep(0.001)
-        self.assertEqual(_g_results, [])
-        gt.kill()
-        self.assert_dead(gt)
-
-    def test_kill_n(self):
-        gt = greenthread.spawn_n(passthru, 7)
-        greenthread.kill(gt)
-        self.assert_dead(gt)
-        greenthread.sleep(0.001)
-        self.assertEqual(_g_results, [])
-        greenthread.kill(gt)
-        self.assert_dead(gt)
-
-    def test_link(self):
-        results = []
-
-        def link_func(g, *a, **kw):
-            results.append(g)
-            results.append(a)
-            results.append(kw)
-        gt = greenthread.spawn(passthru, 5)
-        gt.link(link_func, 4, b=5)
-        self.assertEqual(gt.wait(), ((5,), {}))
-        self.assertEqual(results, [gt, (4,), {'b': 5}])
-
-    def test_link_after_exited(self):
-        results = []
-
-        def link_func(g, *a, **kw):
-            results.append(g)
-            results.append(a)
-            results.append(kw)
-        gt = greenthread.spawn(passthru, 5)
-        self.assertEqual(gt.wait(), ((5,), {}))
-        gt.link(link_func, 4, b=5)
-        self.assertEqual(results, [gt, (4,), {'b': 5}])
-
-    def test_link_relinks(self):
-        # test that linking in a linked func doesn't cause infinite recursion.
-        called = []
-
-        def link_func(g):
-            g.link(link_func_pass)
-
-        def link_func_pass(g):
-            called.append(True)
-
-        gt = greenthread.spawn(passthru)
-        gt.link(link_func)
-        gt.wait()
-        self.assertEqual(called, [True])
-
-
-class SpawnAfter(Spawn):
-    def test_basic(self):
-        gt = greenthread.spawn_after(0.1, passthru, 20)
-        self.assertEqual(gt.wait(), ((20,), {}))
-
-    def test_cancel(self):
-        gt = greenthread.spawn_after(0.1, passthru, 21)
-        gt.cancel()
-        self.assert_dead(gt)
-
-    def test_cancel_already_started(self):
-        gt = greenthread.spawn_after(0, waiter, 22)
-        greenthread.sleep(0)
-        gt.cancel()
-        self.assertEqual(gt.wait(), 22)
-
-    def test_kill_already_started(self):
-        gt = greenthread.spawn_after(0, waiter, 22)
-        greenthread.sleep(0)
-        gt.kill()
-        self.assert_dead(gt)
-
-
-class SpawnAfterLocal(LimitedTestCase, Asserts):
-    def setUp(self):
-        super(SpawnAfterLocal, self).setUp()
-        self.lst = [1]
-
-    def test_timer_fired(self):
-        def func():
-            greenthread.spawn_after_local(0.1, self.lst.pop)
-            greenthread.sleep(0.2)
-
-        greenthread.spawn(func)
-        assert self.lst == [1], self.lst
-        greenthread.sleep(0.3)
-        assert self.lst == [], self.lst
-
-    def test_timer_cancelled_upon_greenlet_exit(self):
-        def func():
-            greenthread.spawn_after_local(0.1, self.lst.pop)
-
-        greenthread.spawn(func)
-        assert self.lst == [1], self.lst
-        greenthread.sleep(0.2)
-        assert self.lst == [1], self.lst
-
-    def test_spawn_is_not_cancelled(self):
-        def func():
-            greenthread.spawn(self.lst.pop)
-            # exiting immediatelly, but self.lst.pop must be called
-        greenthread.spawn(func)
-        greenthread.sleep(0.1)
-        assert self.lst == [], self.lst