Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / db_pool_test.py
diff --git a/eventlet/tests/db_pool_test.py b/eventlet/tests/db_pool_test.py
new file mode 100644 (file)
index 0000000..9fc9ebc
--- /dev/null
@@ -0,0 +1,732 @@
+'''Test cases for db_pool
+'''
+from __future__ import print_function
+
+import sys
+import os
+import traceback
+from unittest import TestCase, main
+
+from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth
+from eventlet import event
+from eventlet import db_pool
+from eventlet.support import six
+import eventlet
+
+
+class DBTester(object):
+    __test__ = False  # so that nose doesn't try to execute this directly
+
+    def setUp(self):
+        self.create_db()
+        self.connection = None
+        connection = self._dbmodule.connect(**self._auth)
+        cursor = connection.cursor()
+        cursor.execute("""CREATE  TABLE gargleblatz
+        (
+        a INTEGER
+        );""")
+        connection.commit()
+        cursor.close()
+        connection.close()
+
+    def tearDown(self):
+        if self.connection:
+            self.connection.close()
+        self.drop_db()
+
+    def set_up_dummy_table(self, connection=None):
+        close_connection = False
+        if connection is None:
+            close_connection = True
+            if self.connection is None:
+                connection = self._dbmodule.connect(**self._auth)
+            else:
+                connection = self.connection
+
+        cursor = connection.cursor()
+        cursor.execute(self.dummy_table_sql)
+        connection.commit()
+        cursor.close()
+        if close_connection:
+            connection.close()
+
+
+# silly mock class
+class Mock(object):
+    pass
+
+
+class DBConnectionPool(DBTester):
+    __test__ = False  # so that nose doesn't try to execute this directly
+
+    def setUp(self):
+        super(DBConnectionPool, self).setUp()
+        self.pool = self.create_pool()
+        self.connection = self.pool.get()
+
+    def tearDown(self):
+        if self.connection:
+            self.pool.put(self.connection)
+        self.pool.clear()
+        super(DBConnectionPool, self).tearDown()
+
+    def assert_cursor_works(self, cursor):
+        cursor.execute("select 1")
+        rows = cursor.fetchall()
+        assert rows
+
+    def test_connecting(self):
+        assert self.connection is not None
+
+    def test_create_cursor(self):
+        cursor = self.connection.cursor()
+        cursor.close()
+
+    def test_run_query(self):
+        cursor = self.connection.cursor()
+        self.assert_cursor_works(cursor)
+        cursor.close()
+
+    def test_run_bad_query(self):
+        cursor = self.connection.cursor()
+        try:
+            cursor.execute("garbage blah blah")
+            assert False
+        except AssertionError:
+            raise
+        except Exception:
+            pass
+        cursor.close()
+
+    def test_put_none(self):
+        # the pool is of size 1, and its only connection is out
+        assert self.pool.free() == 0
+        self.pool.put(None)
+        # ha ha we fooled it into thinking that we had a dead process
+        assert self.pool.free() == 1
+        conn2 = self.pool.get()
+        assert conn2 is not None
+        assert conn2.cursor
+        self.pool.put(conn2)
+
+    def test_close_does_a_put(self):
+        assert self.pool.free() == 0
+        self.connection.close()
+        assert self.pool.free() == 1
+        self.assertRaises(AttributeError, self.connection.cursor)
+
+    @skipped
+    def test_deletion_does_a_put(self):
+        # doing a put on del causes some issues if __del__ is called in the
+        # main coroutine, so, not doing that for now
+        assert self.pool.free() == 0
+        self.connection = None
+        assert self.pool.free() == 1
+
+    def test_put_doesnt_double_wrap(self):
+        self.pool.put(self.connection)
+        conn = self.pool.get()
+        assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
+        self.pool.put(conn)
+
+    def test_bool(self):
+        assert self.connection
+        self.connection.close()
+        assert not self.connection
+
+    def fill_up_table(self, conn):
+        curs = conn.cursor()
+        for i in six.moves.range(1000):
+            curs.execute('insert into test_table (value_int) values (%s)' % i)
+        conn.commit()
+
+    def test_returns_immediately(self):
+        self.pool = self.create_pool()
+        conn = self.pool.get()
+        self.set_up_dummy_table(conn)
+        self.fill_up_table(conn)
+        curs = conn.cursor()
+        results = []
+        SHORT_QUERY = "select * from test_table"
+        evt = event.Event()
+
+        def a_query():
+            self.assert_cursor_works(curs)
+            curs.execute(SHORT_QUERY)
+            results.append(2)
+            evt.send()
+        eventlet.spawn(a_query)
+        results.append(1)
+        self.assertEqual([1], results)
+        evt.wait()
+        self.assertEqual([1, 2], results)
+        self.pool.put(conn)
+
+    def test_connection_is_clean_after_put(self):
+        self.pool = self.create_pool()
+        conn = self.pool.get()
+        self.set_up_dummy_table(conn)
+        curs = conn.cursor()
+        for i in range(10):
+            curs.execute('insert into test_table (value_int) values (%s)' % i)
+        # do not commit  :-)
+        self.pool.put(conn)
+        del conn
+        conn2 = self.pool.get()
+        curs2 = conn2.cursor()
+        for i in range(10):
+            curs2.execute('insert into test_table (value_int) values (%s)' % i)
+        conn2.commit()
+        curs2.execute("select * from test_table")
+        # we should have only inserted them once
+        self.assertEqual(10, curs2.rowcount)
+        self.pool.put(conn2)
+
+    def test_visibility_from_other_connections(self):
+        self.pool = self.create_pool(max_size=3)
+        conn = self.pool.get()
+        conn2 = self.pool.get()
+        curs = conn.cursor()
+        try:
+            curs2 = conn2.cursor()
+            curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
+            self.assertEqual(curs2.rowcount, 1)
+            conn2.commit()
+            selection_query = "select * from gargleblatz"
+            curs2.execute(selection_query)
+            self.assertEqual(curs2.rowcount, 1)
+            del curs2
+            self.pool.put(conn2)
+            # create a new connection, it should see the addition
+            conn3 = self.pool.get()
+            curs3 = conn3.cursor()
+            curs3.execute(selection_query)
+            self.assertEqual(curs3.rowcount, 1)
+            # now, does the already-open connection see it?
+            curs.execute(selection_query)
+            self.assertEqual(curs.rowcount, 1)
+            self.pool.put(conn3)
+        finally:
+            # clean up my litter
+            curs.execute("delete from gargleblatz where a=314159")
+            conn.commit()
+            self.pool.put(conn)
+
+    @skipped
+    def test_two_simultaneous_connections(self):
+        # timing-sensitive test, disabled until we come up with a better
+        # way to do this
+        self.pool = self.create_pool(max_size=2)
+        conn = self.pool.get()
+        self.set_up_dummy_table(conn)
+        self.fill_up_table(conn)
+        curs = conn.cursor()
+        conn2 = self.pool.get()
+        self.set_up_dummy_table(conn2)
+        self.fill_up_table(conn2)
+        curs2 = conn2.cursor()
+        results = []
+        LONG_QUERY = "select * from test_table"
+        SHORT_QUERY = "select * from test_table where row_id <= 20"
+
+        evt = event.Event()
+
+        def long_running_query():
+            self.assert_cursor_works(curs)
+            curs.execute(LONG_QUERY)
+            results.append(1)
+            evt.send()
+        evt2 = event.Event()
+
+        def short_running_query():
+            self.assert_cursor_works(curs2)
+            curs2.execute(SHORT_QUERY)
+            results.append(2)
+            evt2.send()
+
+        eventlet.spawn(long_running_query)
+        eventlet.spawn(short_running_query)
+        evt.wait()
+        evt2.wait()
+        results.sort()
+        self.assertEqual([1, 2], results)
+
+    def test_clear(self):
+        self.pool = self.create_pool()
+        self.pool.put(self.connection)
+        self.pool.clear()
+        self.assertEqual(len(self.pool.free_items), 0)
+
+    def test_clear_warmup(self):
+        """Clear implicitly created connections (min_size > 0)"""
+        self.pool = self.create_pool(min_size=1)
+        self.pool.clear()
+        self.assertEqual(len(self.pool.free_items), 0)
+
+    def test_unwrap_connection(self):
+        self.assert_(isinstance(self.connection,
+                                db_pool.GenericConnectionWrapper))
+        conn = self.pool._unwrap_connection(self.connection)
+        assert not isinstance(conn, db_pool.GenericConnectionWrapper)
+
+        self.assertEqual(None, self.pool._unwrap_connection(None))
+        self.assertEqual(None, self.pool._unwrap_connection(1))
+
+        # testing duck typing here -- as long as the connection has a
+        # _base attribute, it should be unwrappable
+        x = Mock()
+        x._base = 'hi'
+        self.assertEqual('hi', self.pool._unwrap_connection(x))
+        conn.close()
+
+    def test_safe_close(self):
+        self.pool._safe_close(self.connection, quiet=True)
+        self.assertEqual(len(self.pool.free_items), 1)
+
+        self.pool._safe_close(None)
+        self.pool._safe_close(1)
+
+        # now we're really going for 100% coverage
+        x = Mock()
+
+        def fail():
+            raise KeyboardInterrupt()
+        x.close = fail
+        self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
+
+        x = Mock()
+
+        def fail2():
+            raise RuntimeError("if this line has been printed, the test succeeded")
+        x.close = fail2
+        self.pool._safe_close(x, quiet=False)
+
+    def test_zero_max_idle(self):
+        self.pool.put(self.connection)
+        self.pool.clear()
+        self.pool = self.create_pool(max_size=2, max_idle=0)
+        self.connection = self.pool.get()
+        self.connection.close()
+        self.assertEqual(len(self.pool.free_items), 0)
+
+    def test_zero_max_age(self):
+        self.pool.put(self.connection)
+        self.pool.clear()
+        self.pool = self.create_pool(max_size=2, max_age=0)
+        self.connection = self.pool.get()
+        self.connection.close()
+        self.assertEqual(len(self.pool.free_items), 0)
+
+    @skipped
+    def test_max_idle(self):
+        # This test is timing-sensitive.  Rename the function without
+        # the "dont" to run it, but beware that it could fail or take
+        # a while.
+
+        self.pool = self.create_pool(max_size=2, max_idle=0.02)
+        self.connection = self.pool.get()
+        self.connection.close()
+        self.assertEqual(len(self.pool.free_items), 1)
+        eventlet.sleep(0.01)  # not long enough to trigger the idle timeout
+        self.assertEqual(len(self.pool.free_items), 1)
+        self.connection = self.pool.get()
+        self.connection.close()
+        self.assertEqual(len(self.pool.free_items), 1)
+        eventlet.sleep(0.01)  # idle timeout should have fired but done nothing
+        self.assertEqual(len(self.pool.free_items), 1)
+        self.connection = self.pool.get()
+        self.connection.close()
+        self.assertEqual(len(self.pool.free_items), 1)
+        eventlet.sleep(0.03)  # long enough to trigger idle timeout for real
+        self.assertEqual(len(self.pool.free_items), 0)
+
+    @skipped
+    def test_max_idle_many(self):
+        # This test is timing-sensitive.  Rename the function without
+        # the "dont" to run it, but beware that it could fail or take
+        # a while.
+
+        self.pool = self.create_pool(max_size=2, max_idle=0.02)
+        self.connection, conn2 = self.pool.get(), self.pool.get()
+        self.connection.close()
+        eventlet.sleep(0.01)
+        self.assertEqual(len(self.pool.free_items), 1)
+        conn2.close()
+        self.assertEqual(len(self.pool.free_items), 2)
+        eventlet.sleep(0.02)  # trigger cleanup of conn1 but not conn2
+        self.assertEqual(len(self.pool.free_items), 1)
+
+    @skipped
+    def test_max_age(self):
+        # This test is timing-sensitive.  Rename the function without
+        # the "dont" to run it, but beware that it could fail or take
+        # a while.
+
+        self.pool = self.create_pool(max_size=2, max_age=0.05)
+        self.connection = self.pool.get()
+        self.connection.close()
+        self.assertEqual(len(self.pool.free_items), 1)
+        eventlet.sleep(0.01)  # not long enough to trigger the age timeout
+        self.assertEqual(len(self.pool.free_items), 1)
+        self.connection = self.pool.get()
+        self.connection.close()
+        self.assertEqual(len(self.pool.free_items), 1)
+        eventlet.sleep(0.05)  # long enough to trigger age timeout
+        self.assertEqual(len(self.pool.free_items), 0)
+
+    @skipped
+    def test_max_age_many(self):
+        # This test is timing-sensitive.  Rename the function without
+        # the "dont" to run it, but beware that it could fail or take
+        # a while.
+
+        self.pool = self.create_pool(max_size=2, max_age=0.15)
+        self.connection, conn2 = self.pool.get(), self.pool.get()
+        self.connection.close()
+        self.assertEqual(len(self.pool.free_items), 1)
+        eventlet.sleep(0)  # not long enough to trigger the age timeout
+        self.assertEqual(len(self.pool.free_items), 1)
+        eventlet.sleep(0.2)  # long enough to trigger age timeout
+        self.assertEqual(len(self.pool.free_items), 0)
+        conn2.close()  # should not be added to the free items
+        self.assertEqual(len(self.pool.free_items), 0)
+
+    def test_waiters_get_woken(self):
+        # verify that when there's someone waiting on an empty pool
+        # and someone puts an immediately-closed connection back in
+        # the pool that the waiter gets woken
+        self.pool.put(self.connection)
+        self.pool.clear()
+        self.pool = self.create_pool(max_size=1, max_age=0)
+
+        self.connection = self.pool.get()
+        self.assertEqual(self.pool.free(), 0)
+        self.assertEqual(self.pool.waiting(), 0)
+        e = event.Event()
+
+        def retrieve(pool, ev):
+            c = pool.get()
+            ev.send(c)
+        eventlet.spawn(retrieve, self.pool, e)
+        eventlet.sleep(0)  # these two sleeps should advance the retrieve
+        eventlet.sleep(0)  # coroutine until it's waiting in get()
+        self.assertEqual(self.pool.free(), 0)
+        self.assertEqual(self.pool.waiting(), 1)
+        self.pool.put(self.connection)
+        timer = eventlet.Timeout(1)
+        conn = e.wait()
+        timer.cancel()
+        self.assertEqual(self.pool.free(), 0)
+        self.assertEqual(self.pool.waiting(), 0)
+        self.pool.put(conn)
+
+    @skipped
+    def test_0_straight_benchmark(self):
+        """ Benchmark; don't run unless you want to wait a while."""
+        import time
+        iterations = 20000
+        c = self.connection.cursor()
+        self.connection.commit()
+
+        def bench(c):
+            for i in six.moves.range(iterations):
+                c.execute('select 1')
+
+        bench(c)  # warm-up
+        results = []
+        for i in range(3):
+            start = time.time()
+            bench(c)
+            end = time.time()
+            results.append(end - start)
+
+        print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
+            iterations, sum(results) / len(results), results, type(self)))
+
+    def test_raising_create(self):
+        # if the create() method raises an exception the pool should
+        # not lose any connections
+        self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
+        self.assertRaises(RuntimeError, self.pool.get)
+        self.assertEqual(self.pool.free(), 1)
+
+
+class DummyConnection(object):
+    def rollback(self):
+        pass
+
+
+class DummyDBModule(object):
+    def connect(self, *args, **kwargs):
+        return DummyConnection()
+
+
+class RaisingDBModule(object):
+    def connect(self, *args, **kw):
+        raise RuntimeError()
+
+
+class TpoolConnectionPool(DBConnectionPool):
+    __test__ = False  # so that nose doesn't try to execute this directly
+
+    def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
+                    connect_timeout=0.5, module=None):
+        if module is None:
+            module = self._dbmodule
+        return db_pool.TpooledConnectionPool(
+            module,
+            min_size=min_size, max_size=max_size,
+            max_idle=max_idle, max_age=max_age,
+            connect_timeout=connect_timeout,
+            **self._auth)
+
+    @skip_with_pyevent
+    def setUp(self):
+        super(TpoolConnectionPool, self).setUp()
+
+    def tearDown(self):
+        super(TpoolConnectionPool, self).tearDown()
+        from eventlet import tpool
+        tpool.killall()
+
+
+class RawConnectionPool(DBConnectionPool):
+    __test__ = False  # so that nose doesn't try to execute this directly
+
+    def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
+                    connect_timeout=0.5, module=None):
+        if module is None:
+            module = self._dbmodule
+        return db_pool.RawConnectionPool(
+            module,
+            min_size=min_size, max_size=max_size,
+            max_idle=max_idle, max_age=max_age,
+            connect_timeout=connect_timeout,
+            **self._auth)
+
+
+def test_raw_pool_issue_125():
+    # pool = self.create_pool(min_size=3, max_size=5)
+    pool = db_pool.RawConnectionPool(
+        DummyDBModule(),
+        dsn="dbname=test user=jessica port=5433",
+        min_size=3, max_size=5)
+    conn = pool.get()
+    pool.put(conn)
+
+
+def test_raw_pool_custom_cleanup_ok():
+    cleanup_mock = mock.Mock()
+    pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
+    conn = pool.get()
+    pool.put(conn)
+    assert cleanup_mock.call_count == 1
+
+    with pool.item() as conn:
+        pass
+    assert cleanup_mock.call_count == 2
+
+
+def test_raw_pool_custom_cleanup_arg_error():
+    cleanup_mock = mock.Mock(side_effect=NotImplementedError)
+    pool = db_pool.RawConnectionPool(DummyDBModule())
+    conn = pool.get()
+    pool.put(conn, cleanup=cleanup_mock)
+    assert cleanup_mock.call_count == 1
+
+    with pool.item(cleanup=cleanup_mock):
+        pass
+    assert cleanup_mock.call_count == 2
+
+
+def test_raw_pool_custom_cleanup_fatal():
+    state = [0]
+
+    def cleanup(conn):
+        state[0] += 1
+        raise KeyboardInterrupt
+
+    pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
+    conn = pool.get()
+    try:
+        pool.put(conn)
+    except KeyboardInterrupt:
+        pass
+    else:
+        assert False, 'Expected KeyboardInterrupt'
+    assert state[0] == 1
+
+
+def test_raw_pool_clear_update_current_size():
+    # https://github.com/eventlet/eventlet/issues/139
+    # BaseConnectionPool.clear does not update .current_size.
+    # That leads to situation when new connections could not be created.
+    pool = db_pool.RawConnectionPool(DummyDBModule())
+    pool.get().close()
+    assert pool.current_size == 1
+    assert len(pool.free_items) == 1
+    pool.clear()
+    assert pool.current_size == 0
+    assert len(pool.free_items) == 0
+
+
+get_auth = get_database_auth
+
+
+def mysql_requirement(_f):
+    verbose = os.environ.get('eventlet_test_mysql_verbose')
+    try:
+        import MySQLdb
+        try:
+            auth = get_auth()['MySQLdb'].copy()
+            MySQLdb.connect(**auth)
+            return True
+        except MySQLdb.OperationalError:
+            if verbose:
+                print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
+                traceback.print_exc()
+            return False
+    except ImportError:
+        if verbose:
+            print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
+        return False
+
+
+class MysqlConnectionPool(object):
+    dummy_table_sql = """CREATE TEMPORARY TABLE test_table
+        (
+        row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
+        value_int INTEGER,
+        value_float FLOAT,
+        value_string VARCHAR(200),
+        value_uuid CHAR(36),
+        value_binary BLOB,
+        value_binary_string VARCHAR(200) BINARY,
+        value_enum ENUM('Y','N'),
+        created TIMESTAMP
+        ) ENGINE=InnoDB;"""
+
+    @skip_unless(mysql_requirement)
+    def setUp(self):
+        import MySQLdb
+        self._dbmodule = MySQLdb
+        self._auth = get_auth()['MySQLdb']
+        super(MysqlConnectionPool, self).setUp()
+
+    def tearDown(self):
+        super(MysqlConnectionPool, self).tearDown()
+
+    def create_db(self):
+        auth = self._auth.copy()
+        try:
+            self.drop_db()
+        except Exception:
+            pass
+        dbname = 'test%s' % os.getpid()
+        db = self._dbmodule.connect(**auth).cursor()
+        db.execute("create database " + dbname)
+        db.close()
+        self._auth['db'] = dbname
+        del db
+
+    def drop_db(self):
+        db = self._dbmodule.connect(**self._auth).cursor()
+        db.execute("drop database " + self._auth['db'])
+        db.close()
+        del db
+
+
+class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
+    __test__ = True
+
+
+class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
+    __test__ = True
+
+
+def postgres_requirement(_f):
+    try:
+        import psycopg2
+        try:
+            auth = get_auth()['psycopg2'].copy()
+            psycopg2.connect(**auth)
+            return True
+        except psycopg2.OperationalError:
+            print("Skipping postgres tests, error when connecting")
+            return False
+    except ImportError:
+        print("Skipping postgres tests, psycopg2 not importable")
+        return False
+
+
+class Psycopg2ConnectionPool(object):
+    dummy_table_sql = """CREATE TEMPORARY TABLE test_table
+        (
+        row_id SERIAL PRIMARY KEY,
+        value_int INTEGER,
+        value_float FLOAT,
+        value_string VARCHAR(200),
+        value_uuid CHAR(36),
+        value_binary BYTEA,
+        value_binary_string BYTEA,
+        created TIMESTAMP
+        );"""
+
+    @skip_unless(postgres_requirement)
+    def setUp(self):
+        import psycopg2
+        self._dbmodule = psycopg2
+        self._auth = get_auth()['psycopg2']
+        super(Psycopg2ConnectionPool, self).setUp()
+
+    def tearDown(self):
+        super(Psycopg2ConnectionPool, self).tearDown()
+
+    def create_db(self):
+        dbname = 'test%s' % os.getpid()
+        self._auth['database'] = dbname
+        try:
+            self.drop_db()
+        except Exception:
+            pass
+        auth = self._auth.copy()
+        auth.pop('database')  # can't create if you're connecting to it
+        conn = self._dbmodule.connect(**auth)
+        conn.set_isolation_level(0)
+        db = conn.cursor()
+        db.execute("create database " + dbname)
+        db.close()
+        conn.close()
+
+    def drop_db(self):
+        auth = self._auth.copy()
+        auth.pop('database')  # can't drop database we connected to
+        conn = self._dbmodule.connect(**auth)
+        conn.set_isolation_level(0)
+        db = conn.cursor()
+        db.execute("drop database " + self._auth['database'])
+        db.close()
+        conn.close()
+
+
+class TestPsycopg2Base(TestCase):
+    __test__ = False
+
+    def test_cursor_works_as_context_manager(self):
+        with self.connection.cursor() as c:
+            c.execute('select 1')
+            row = c.fetchone()
+            assert row == (1,)
+
+
+class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
+    __test__ = True
+
+
+class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
+    __test__ = True
+
+
+if __name__ == '__main__':
+    main()