Add python-eventlet package to MOS 8.0 repository
[packages/trusty/python-eventlet.git] / eventlet / tests / api_test.py
diff --git a/eventlet/tests/api_test.py b/eventlet/tests/api_test.py
deleted file mode 100644 (file)
index f9636da..0000000
+++ /dev/null
@@ -1,187 +0,0 @@
-import os
-from unittest import TestCase, main
-
-from nose.tools import eq_
-
-import eventlet
-from eventlet import greenio, hubs, greenthread, spawn
-from eventlet.green import ssl
-from tests import skip_if_no_ssl
-
-
-def check_hub():
-    # Clear through the descriptor queue
-    eventlet.sleep(0)
-    eventlet.sleep(0)
-    hub = hubs.get_hub()
-    for nm in 'get_readers', 'get_writers':
-        dct = getattr(hub, nm)()
-        assert not dct, "hub.%s not empty: %s" % (nm, dct)
-    hub.abort(True)
-    assert not hub.running
-
-
-class TestApi(TestCase):
-
-    certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
-    private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
-
-    def test_tcp_listener(self):
-        socket = eventlet.listen(('0.0.0.0', 0))
-        assert socket.getsockname()[0] == '0.0.0.0'
-        socket.close()
-
-        check_hub()
-
-    def test_connect_tcp(self):
-        def accept_once(listenfd):
-            try:
-                conn, addr = listenfd.accept()
-                fd = conn.makefile(mode='wb')
-                conn.close()
-                fd.write(b'hello\n')
-                fd.close()
-            finally:
-                listenfd.close()
-
-        server = eventlet.listen(('0.0.0.0', 0))
-        eventlet.spawn_n(accept_once, server)
-
-        client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
-        fd = client.makefile('rb')
-        client.close()
-        eq_(fd.readline(), b'hello\n')
-        eq_(fd.read(), b'')
-        fd.close()
-
-        check_hub()
-
-    @skip_if_no_ssl
-    def test_connect_ssl(self):
-        def accept_once(listenfd):
-            try:
-                conn, addr = listenfd.accept()
-                conn.write(b'hello\r\n')
-                greenio.shutdown_safe(conn)
-                conn.close()
-            finally:
-                greenio.shutdown_safe(listenfd)
-                listenfd.close()
-
-        server = eventlet.wrap_ssl(
-            eventlet.listen(('0.0.0.0', 0)),
-            self.private_key_file,
-            self.certificate_file,
-            server_side=True
-        )
-        eventlet.spawn_n(accept_once, server)
-
-        raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
-        client = ssl.wrap_socket(raw_client)
-        fd = client.makefile('rb', 8192)
-
-        assert fd.readline() == b'hello\r\n'
-        try:
-            self.assertEqual(b'', fd.read(10))
-        except greenio.SSL.ZeroReturnError:
-            # if it's a GreenSSL object it'll do this
-            pass
-        greenio.shutdown_safe(client)
-        client.close()
-
-        check_hub()
-
-    def test_001_trampoline_timeout(self):
-        server_sock = eventlet.listen(('127.0.0.1', 0))
-        bound_port = server_sock.getsockname()[1]
-
-        def server(sock):
-            client, addr = sock.accept()
-            eventlet.sleep(0.1)
-        server_evt = spawn(server, server_sock)
-        eventlet.sleep(0)
-        try:
-            desc = eventlet.connect(('127.0.0.1', bound_port))
-            hubs.trampoline(desc, read=True, write=False, timeout=0.001)
-        except eventlet.TimeoutError:
-            pass  # test passed
-        else:
-            assert False, "Didn't timeout"
-
-        server_evt.wait()
-        check_hub()
-
-    def test_timeout_cancel(self):
-        server = eventlet.listen(('0.0.0.0', 0))
-        bound_port = server.getsockname()[1]
-
-        done = [False]
-
-        def client_closer(sock):
-            while True:
-                (conn, addr) = sock.accept()
-                conn.close()
-
-        def go():
-            desc = eventlet.connect(('127.0.0.1', bound_port))
-            try:
-                hubs.trampoline(desc, read=True, timeout=0.1)
-            except eventlet.TimeoutError:
-                assert False, "Timed out"
-
-            server.close()
-            desc.close()
-            done[0] = True
-
-        greenthread.spawn_after_local(0, go)
-
-        server_coro = eventlet.spawn(client_closer, server)
-        while not done[0]:
-            eventlet.sleep(0)
-        eventlet.kill(server_coro)
-
-        check_hub()
-
-    def test_killing_dormant(self):
-        DELAY = 0.1
-        state = []
-
-        def test():
-            try:
-                state.append('start')
-                eventlet.sleep(DELAY)
-            except:
-                state.append('except')
-                # catching GreenletExit
-                pass
-            # when switching to hub, hub makes itself the parent of this greenlet,
-            # thus after the function's done, the control will go to the parent
-            eventlet.sleep(0)
-            state.append('finished')
-
-        g = eventlet.spawn(test)
-        eventlet.sleep(DELAY / 2)
-        self.assertEqual(state, ['start'])
-        eventlet.kill(g)
-        # will not get there, unless switching is explicitly scheduled by kill
-        self.assertEqual(state, ['start', 'except'])
-        eventlet.sleep(DELAY)
-        self.assertEqual(state, ['start', 'except', 'finished'])
-
-    def test_nested_with_timeout(self):
-        def func():
-            return eventlet.with_timeout(0.2, eventlet.sleep, 2, timeout_value=1)
-
-        try:
-            eventlet.with_timeout(0.1, func)
-            self.fail(u'Expected TimeoutError')
-        except eventlet.TimeoutError:
-            pass
-
-
-class Foo(object):
-    pass
-
-
-if __name__ == '__main__':
-    main()