X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=eventlet%2Ftests%2Fdb_pool_test.py;fp=eventlet%2Ftests%2Fdb_pool_test.py;h=9fc9ebc2005232b80486d368c3be116341cd6eb7;hb=376ff3bfe7071cc0793184a378c4e74508fb0d97;hp=0000000000000000000000000000000000000000;hpb=70992db4bef26426213a8eae488be377cdd655ae;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/tests/db_pool_test.py b/eventlet/tests/db_pool_test.py new file mode 100644 index 0000000..9fc9ebc --- /dev/null +++ b/eventlet/tests/db_pool_test.py @@ -0,0 +1,732 @@ +'''Test cases for db_pool +''' +from __future__ import print_function + +import sys +import os +import traceback +from unittest import TestCase, main + +from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth +from eventlet import event +from eventlet import db_pool +from eventlet.support import six +import eventlet + + +class DBTester(object): + __test__ = False # so that nose doesn't try to execute this directly + + def setUp(self): + self.create_db() + self.connection = None + connection = self._dbmodule.connect(**self._auth) + cursor = connection.cursor() + cursor.execute("""CREATE TABLE gargleblatz + ( + a INTEGER + );""") + connection.commit() + cursor.close() + connection.close() + + def tearDown(self): + if self.connection: + self.connection.close() + self.drop_db() + + def set_up_dummy_table(self, connection=None): + close_connection = False + if connection is None: + close_connection = True + if self.connection is None: + connection = self._dbmodule.connect(**self._auth) + else: + connection = self.connection + + cursor = connection.cursor() + cursor.execute(self.dummy_table_sql) + connection.commit() + cursor.close() + if close_connection: + connection.close() + + +# silly mock class +class Mock(object): + pass + + +class DBConnectionPool(DBTester): + __test__ = False # so that nose doesn't try to execute this directly + + def setUp(self): + super(DBConnectionPool, self).setUp() + self.pool = self.create_pool() + self.connection = self.pool.get() + + def tearDown(self): + if self.connection: + self.pool.put(self.connection) + self.pool.clear() + super(DBConnectionPool, self).tearDown() + + def assert_cursor_works(self, cursor): + cursor.execute("select 1") + rows = cursor.fetchall() + assert rows + + def test_connecting(self): + assert self.connection is not None + + def test_create_cursor(self): + cursor = self.connection.cursor() + cursor.close() + + def test_run_query(self): + cursor = self.connection.cursor() + self.assert_cursor_works(cursor) + cursor.close() + + def test_run_bad_query(self): + cursor = self.connection.cursor() + try: + cursor.execute("garbage blah blah") + assert False + except AssertionError: + raise + except Exception: + pass + cursor.close() + + def test_put_none(self): + # the pool is of size 1, and its only connection is out + assert self.pool.free() == 0 + self.pool.put(None) + # ha ha we fooled it into thinking that we had a dead process + assert self.pool.free() == 1 + conn2 = self.pool.get() + assert conn2 is not None + assert conn2.cursor + self.pool.put(conn2) + + def test_close_does_a_put(self): + assert self.pool.free() == 0 + self.connection.close() + assert self.pool.free() == 1 + self.assertRaises(AttributeError, self.connection.cursor) + + @skipped + def test_deletion_does_a_put(self): + # doing a put on del causes some issues if __del__ is called in the + # main coroutine, so, not doing that for now + assert self.pool.free() == 0 + self.connection = None + assert self.pool.free() == 1 + + def test_put_doesnt_double_wrap(self): + self.pool.put(self.connection) + conn = self.pool.get() + assert not isinstance(conn._base, db_pool.PooledConnectionWrapper) + self.pool.put(conn) + + def test_bool(self): + assert self.connection + self.connection.close() + assert not self.connection + + def fill_up_table(self, conn): + curs = conn.cursor() + for i in six.moves.range(1000): + curs.execute('insert into test_table (value_int) values (%s)' % i) + conn.commit() + + def test_returns_immediately(self): + self.pool = self.create_pool() + conn = self.pool.get() + self.set_up_dummy_table(conn) + self.fill_up_table(conn) + curs = conn.cursor() + results = [] + SHORT_QUERY = "select * from test_table" + evt = event.Event() + + def a_query(): + self.assert_cursor_works(curs) + curs.execute(SHORT_QUERY) + results.append(2) + evt.send() + eventlet.spawn(a_query) + results.append(1) + self.assertEqual([1], results) + evt.wait() + self.assertEqual([1, 2], results) + self.pool.put(conn) + + def test_connection_is_clean_after_put(self): + self.pool = self.create_pool() + conn = self.pool.get() + self.set_up_dummy_table(conn) + curs = conn.cursor() + for i in range(10): + curs.execute('insert into test_table (value_int) values (%s)' % i) + # do not commit :-) + self.pool.put(conn) + del conn + conn2 = self.pool.get() + curs2 = conn2.cursor() + for i in range(10): + curs2.execute('insert into test_table (value_int) values (%s)' % i) + conn2.commit() + curs2.execute("select * from test_table") + # we should have only inserted them once + self.assertEqual(10, curs2.rowcount) + self.pool.put(conn2) + + def test_visibility_from_other_connections(self): + self.pool = self.create_pool(max_size=3) + conn = self.pool.get() + conn2 = self.pool.get() + curs = conn.cursor() + try: + curs2 = conn2.cursor() + curs2.execute("insert into gargleblatz (a) values (%s)" % (314159)) + self.assertEqual(curs2.rowcount, 1) + conn2.commit() + selection_query = "select * from gargleblatz" + curs2.execute(selection_query) + self.assertEqual(curs2.rowcount, 1) + del curs2 + self.pool.put(conn2) + # create a new connection, it should see the addition + conn3 = self.pool.get() + curs3 = conn3.cursor() + curs3.execute(selection_query) + self.assertEqual(curs3.rowcount, 1) + # now, does the already-open connection see it? + curs.execute(selection_query) + self.assertEqual(curs.rowcount, 1) + self.pool.put(conn3) + finally: + # clean up my litter + curs.execute("delete from gargleblatz where a=314159") + conn.commit() + self.pool.put(conn) + + @skipped + def test_two_simultaneous_connections(self): + # timing-sensitive test, disabled until we come up with a better + # way to do this + self.pool = self.create_pool(max_size=2) + conn = self.pool.get() + self.set_up_dummy_table(conn) + self.fill_up_table(conn) + curs = conn.cursor() + conn2 = self.pool.get() + self.set_up_dummy_table(conn2) + self.fill_up_table(conn2) + curs2 = conn2.cursor() + results = [] + LONG_QUERY = "select * from test_table" + SHORT_QUERY = "select * from test_table where row_id <= 20" + + evt = event.Event() + + def long_running_query(): + self.assert_cursor_works(curs) + curs.execute(LONG_QUERY) + results.append(1) + evt.send() + evt2 = event.Event() + + def short_running_query(): + self.assert_cursor_works(curs2) + curs2.execute(SHORT_QUERY) + results.append(2) + evt2.send() + + eventlet.spawn(long_running_query) + eventlet.spawn(short_running_query) + evt.wait() + evt2.wait() + results.sort() + self.assertEqual([1, 2], results) + + def test_clear(self): + self.pool = self.create_pool() + self.pool.put(self.connection) + self.pool.clear() + self.assertEqual(len(self.pool.free_items), 0) + + def test_clear_warmup(self): + """Clear implicitly created connections (min_size > 0)""" + self.pool = self.create_pool(min_size=1) + self.pool.clear() + self.assertEqual(len(self.pool.free_items), 0) + + def test_unwrap_connection(self): + self.assert_(isinstance(self.connection, + db_pool.GenericConnectionWrapper)) + conn = self.pool._unwrap_connection(self.connection) + assert not isinstance(conn, db_pool.GenericConnectionWrapper) + + self.assertEqual(None, self.pool._unwrap_connection(None)) + self.assertEqual(None, self.pool._unwrap_connection(1)) + + # testing duck typing here -- as long as the connection has a + # _base attribute, it should be unwrappable + x = Mock() + x._base = 'hi' + self.assertEqual('hi', self.pool._unwrap_connection(x)) + conn.close() + + def test_safe_close(self): + self.pool._safe_close(self.connection, quiet=True) + self.assertEqual(len(self.pool.free_items), 1) + + self.pool._safe_close(None) + self.pool._safe_close(1) + + # now we're really going for 100% coverage + x = Mock() + + def fail(): + raise KeyboardInterrupt() + x.close = fail + self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x) + + x = Mock() + + def fail2(): + raise RuntimeError("if this line has been printed, the test succeeded") + x.close = fail2 + self.pool._safe_close(x, quiet=False) + + def test_zero_max_idle(self): + self.pool.put(self.connection) + self.pool.clear() + self.pool = self.create_pool(max_size=2, max_idle=0) + self.connection = self.pool.get() + self.connection.close() + self.assertEqual(len(self.pool.free_items), 0) + + def test_zero_max_age(self): + self.pool.put(self.connection) + self.pool.clear() + self.pool = self.create_pool(max_size=2, max_age=0) + self.connection = self.pool.get() + self.connection.close() + self.assertEqual(len(self.pool.free_items), 0) + + @skipped + def test_max_idle(self): + # This test is timing-sensitive. Rename the function without + # the "dont" to run it, but beware that it could fail or take + # a while. + + self.pool = self.create_pool(max_size=2, max_idle=0.02) + self.connection = self.pool.get() + self.connection.close() + self.assertEqual(len(self.pool.free_items), 1) + eventlet.sleep(0.01) # not long enough to trigger the idle timeout + self.assertEqual(len(self.pool.free_items), 1) + self.connection = self.pool.get() + self.connection.close() + self.assertEqual(len(self.pool.free_items), 1) + eventlet.sleep(0.01) # idle timeout should have fired but done nothing + self.assertEqual(len(self.pool.free_items), 1) + self.connection = self.pool.get() + self.connection.close() + self.assertEqual(len(self.pool.free_items), 1) + eventlet.sleep(0.03) # long enough to trigger idle timeout for real + self.assertEqual(len(self.pool.free_items), 0) + + @skipped + def test_max_idle_many(self): + # This test is timing-sensitive. Rename the function without + # the "dont" to run it, but beware that it could fail or take + # a while. + + self.pool = self.create_pool(max_size=2, max_idle=0.02) + self.connection, conn2 = self.pool.get(), self.pool.get() + self.connection.close() + eventlet.sleep(0.01) + self.assertEqual(len(self.pool.free_items), 1) + conn2.close() + self.assertEqual(len(self.pool.free_items), 2) + eventlet.sleep(0.02) # trigger cleanup of conn1 but not conn2 + self.assertEqual(len(self.pool.free_items), 1) + + @skipped + def test_max_age(self): + # This test is timing-sensitive. Rename the function without + # the "dont" to run it, but beware that it could fail or take + # a while. + + self.pool = self.create_pool(max_size=2, max_age=0.05) + self.connection = self.pool.get() + self.connection.close() + self.assertEqual(len(self.pool.free_items), 1) + eventlet.sleep(0.01) # not long enough to trigger the age timeout + self.assertEqual(len(self.pool.free_items), 1) + self.connection = self.pool.get() + self.connection.close() + self.assertEqual(len(self.pool.free_items), 1) + eventlet.sleep(0.05) # long enough to trigger age timeout + self.assertEqual(len(self.pool.free_items), 0) + + @skipped + def test_max_age_many(self): + # This test is timing-sensitive. Rename the function without + # the "dont" to run it, but beware that it could fail or take + # a while. + + self.pool = self.create_pool(max_size=2, max_age=0.15) + self.connection, conn2 = self.pool.get(), self.pool.get() + self.connection.close() + self.assertEqual(len(self.pool.free_items), 1) + eventlet.sleep(0) # not long enough to trigger the age timeout + self.assertEqual(len(self.pool.free_items), 1) + eventlet.sleep(0.2) # long enough to trigger age timeout + self.assertEqual(len(self.pool.free_items), 0) + conn2.close() # should not be added to the free items + self.assertEqual(len(self.pool.free_items), 0) + + def test_waiters_get_woken(self): + # verify that when there's someone waiting on an empty pool + # and someone puts an immediately-closed connection back in + # the pool that the waiter gets woken + self.pool.put(self.connection) + self.pool.clear() + self.pool = self.create_pool(max_size=1, max_age=0) + + self.connection = self.pool.get() + self.assertEqual(self.pool.free(), 0) + self.assertEqual(self.pool.waiting(), 0) + e = event.Event() + + def retrieve(pool, ev): + c = pool.get() + ev.send(c) + eventlet.spawn(retrieve, self.pool, e) + eventlet.sleep(0) # these two sleeps should advance the retrieve + eventlet.sleep(0) # coroutine until it's waiting in get() + self.assertEqual(self.pool.free(), 0) + self.assertEqual(self.pool.waiting(), 1) + self.pool.put(self.connection) + timer = eventlet.Timeout(1) + conn = e.wait() + timer.cancel() + self.assertEqual(self.pool.free(), 0) + self.assertEqual(self.pool.waiting(), 0) + self.pool.put(conn) + + @skipped + def test_0_straight_benchmark(self): + """ Benchmark; don't run unless you want to wait a while.""" + import time + iterations = 20000 + c = self.connection.cursor() + self.connection.commit() + + def bench(c): + for i in six.moves.range(iterations): + c.execute('select 1') + + bench(c) # warm-up + results = [] + for i in range(3): + start = time.time() + bench(c) + end = time.time() + results.append(end - start) + + print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % ( + iterations, sum(results) / len(results), results, type(self))) + + def test_raising_create(self): + # if the create() method raises an exception the pool should + # not lose any connections + self.pool = self.create_pool(max_size=1, module=RaisingDBModule()) + self.assertRaises(RuntimeError, self.pool.get) + self.assertEqual(self.pool.free(), 1) + + +class DummyConnection(object): + def rollback(self): + pass + + +class DummyDBModule(object): + def connect(self, *args, **kwargs): + return DummyConnection() + + +class RaisingDBModule(object): + def connect(self, *args, **kw): + raise RuntimeError() + + +class TpoolConnectionPool(DBConnectionPool): + __test__ = False # so that nose doesn't try to execute this directly + + def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10, + connect_timeout=0.5, module=None): + if module is None: + module = self._dbmodule + return db_pool.TpooledConnectionPool( + module, + min_size=min_size, max_size=max_size, + max_idle=max_idle, max_age=max_age, + connect_timeout=connect_timeout, + **self._auth) + + @skip_with_pyevent + def setUp(self): + super(TpoolConnectionPool, self).setUp() + + def tearDown(self): + super(TpoolConnectionPool, self).tearDown() + from eventlet import tpool + tpool.killall() + + +class RawConnectionPool(DBConnectionPool): + __test__ = False # so that nose doesn't try to execute this directly + + def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10, + connect_timeout=0.5, module=None): + if module is None: + module = self._dbmodule + return db_pool.RawConnectionPool( + module, + min_size=min_size, max_size=max_size, + max_idle=max_idle, max_age=max_age, + connect_timeout=connect_timeout, + **self._auth) + + +def test_raw_pool_issue_125(): + # pool = self.create_pool(min_size=3, max_size=5) + pool = db_pool.RawConnectionPool( + DummyDBModule(), + dsn="dbname=test user=jessica port=5433", + min_size=3, max_size=5) + conn = pool.get() + pool.put(conn) + + +def test_raw_pool_custom_cleanup_ok(): + cleanup_mock = mock.Mock() + pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock) + conn = pool.get() + pool.put(conn) + assert cleanup_mock.call_count == 1 + + with pool.item() as conn: + pass + assert cleanup_mock.call_count == 2 + + +def test_raw_pool_custom_cleanup_arg_error(): + cleanup_mock = mock.Mock(side_effect=NotImplementedError) + pool = db_pool.RawConnectionPool(DummyDBModule()) + conn = pool.get() + pool.put(conn, cleanup=cleanup_mock) + assert cleanup_mock.call_count == 1 + + with pool.item(cleanup=cleanup_mock): + pass + assert cleanup_mock.call_count == 2 + + +def test_raw_pool_custom_cleanup_fatal(): + state = [0] + + def cleanup(conn): + state[0] += 1 + raise KeyboardInterrupt + + pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup) + conn = pool.get() + try: + pool.put(conn) + except KeyboardInterrupt: + pass + else: + assert False, 'Expected KeyboardInterrupt' + assert state[0] == 1 + + +def test_raw_pool_clear_update_current_size(): + # https://github.com/eventlet/eventlet/issues/139 + # BaseConnectionPool.clear does not update .current_size. + # That leads to situation when new connections could not be created. + pool = db_pool.RawConnectionPool(DummyDBModule()) + pool.get().close() + assert pool.current_size == 1 + assert len(pool.free_items) == 1 + pool.clear() + assert pool.current_size == 0 + assert len(pool.free_items) == 0 + + +get_auth = get_database_auth + + +def mysql_requirement(_f): + verbose = os.environ.get('eventlet_test_mysql_verbose') + try: + import MySQLdb + try: + auth = get_auth()['MySQLdb'].copy() + MySQLdb.connect(**auth) + return True + except MySQLdb.OperationalError: + if verbose: + print(">> Skipping mysql tests, error when connecting:", file=sys.stderr) + traceback.print_exc() + return False + except ImportError: + if verbose: + print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr) + return False + + +class MysqlConnectionPool(object): + dummy_table_sql = """CREATE TEMPORARY TABLE test_table + ( + row_id INTEGER PRIMARY KEY AUTO_INCREMENT, + value_int INTEGER, + value_float FLOAT, + value_string VARCHAR(200), + value_uuid CHAR(36), + value_binary BLOB, + value_binary_string VARCHAR(200) BINARY, + value_enum ENUM('Y','N'), + created TIMESTAMP + ) ENGINE=InnoDB;""" + + @skip_unless(mysql_requirement) + def setUp(self): + import MySQLdb + self._dbmodule = MySQLdb + self._auth = get_auth()['MySQLdb'] + super(MysqlConnectionPool, self).setUp() + + def tearDown(self): + super(MysqlConnectionPool, self).tearDown() + + def create_db(self): + auth = self._auth.copy() + try: + self.drop_db() + except Exception: + pass + dbname = 'test%s' % os.getpid() + db = self._dbmodule.connect(**auth).cursor() + db.execute("create database " + dbname) + db.close() + self._auth['db'] = dbname + del db + + def drop_db(self): + db = self._dbmodule.connect(**self._auth).cursor() + db.execute("drop database " + self._auth['db']) + db.close() + del db + + +class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase): + __test__ = True + + +class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase): + __test__ = True + + +def postgres_requirement(_f): + try: + import psycopg2 + try: + auth = get_auth()['psycopg2'].copy() + psycopg2.connect(**auth) + return True + except psycopg2.OperationalError: + print("Skipping postgres tests, error when connecting") + return False + except ImportError: + print("Skipping postgres tests, psycopg2 not importable") + return False + + +class Psycopg2ConnectionPool(object): + dummy_table_sql = """CREATE TEMPORARY TABLE test_table + ( + row_id SERIAL PRIMARY KEY, + value_int INTEGER, + value_float FLOAT, + value_string VARCHAR(200), + value_uuid CHAR(36), + value_binary BYTEA, + value_binary_string BYTEA, + created TIMESTAMP + );""" + + @skip_unless(postgres_requirement) + def setUp(self): + import psycopg2 + self._dbmodule = psycopg2 + self._auth = get_auth()['psycopg2'] + super(Psycopg2ConnectionPool, self).setUp() + + def tearDown(self): + super(Psycopg2ConnectionPool, self).tearDown() + + def create_db(self): + dbname = 'test%s' % os.getpid() + self._auth['database'] = dbname + try: + self.drop_db() + except Exception: + pass + auth = self._auth.copy() + auth.pop('database') # can't create if you're connecting to it + conn = self._dbmodule.connect(**auth) + conn.set_isolation_level(0) + db = conn.cursor() + db.execute("create database " + dbname) + db.close() + conn.close() + + def drop_db(self): + auth = self._auth.copy() + auth.pop('database') # can't drop database we connected to + conn = self._dbmodule.connect(**auth) + conn.set_isolation_level(0) + db = conn.cursor() + db.execute("drop database " + self._auth['database']) + db.close() + conn.close() + + +class TestPsycopg2Base(TestCase): + __test__ = False + + def test_cursor_works_as_context_manager(self): + with self.connection.cursor() as c: + c.execute('select 1') + row = c.fetchone() + assert row == (1,) + + +class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base): + __test__ = True + + +class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base): + __test__ = True + + +if __name__ == '__main__': + main()