X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=eventlet%2Ftests%2Fdb_pool_test.py;fp=eventlet%2Ftests%2Fdb_pool_test.py;h=0000000000000000000000000000000000000000;hb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;hp=9fc9ebc2005232b80486d368c3be116341cd6eb7;hpb=376ff3bfe7071cc0793184a378c4e74508fb0d97;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/tests/db_pool_test.py b/eventlet/tests/db_pool_test.py deleted file mode 100644 index 9fc9ebc..0000000 --- a/eventlet/tests/db_pool_test.py +++ /dev/null @@ -1,732 +0,0 @@ -'''Test cases for db_pool -''' -from __future__ import print_function - -import sys -import os -import traceback -from unittest import TestCase, main - -from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth -from eventlet import event -from eventlet import db_pool -from eventlet.support import six -import eventlet - - -class DBTester(object): - __test__ = False # so that nose doesn't try to execute this directly - - def setUp(self): - self.create_db() - self.connection = None - connection = self._dbmodule.connect(**self._auth) - cursor = connection.cursor() - cursor.execute("""CREATE TABLE gargleblatz - ( - a INTEGER - );""") - connection.commit() - cursor.close() - connection.close() - - def tearDown(self): - if self.connection: - self.connection.close() - self.drop_db() - - def set_up_dummy_table(self, connection=None): - close_connection = False - if connection is None: - close_connection = True - if self.connection is None: - connection = self._dbmodule.connect(**self._auth) - else: - connection = self.connection - - cursor = connection.cursor() - cursor.execute(self.dummy_table_sql) - connection.commit() - cursor.close() - if close_connection: - connection.close() - - -# silly mock class -class Mock(object): - pass - - -class DBConnectionPool(DBTester): - __test__ = False # so that nose doesn't try to execute this directly - - def setUp(self): - super(DBConnectionPool, self).setUp() - self.pool = self.create_pool() - self.connection = self.pool.get() - - def tearDown(self): - if self.connection: - self.pool.put(self.connection) - self.pool.clear() - super(DBConnectionPool, self).tearDown() - - def assert_cursor_works(self, cursor): - cursor.execute("select 1") - rows = cursor.fetchall() - assert rows - - def test_connecting(self): - assert self.connection is not None - - def test_create_cursor(self): - cursor = self.connection.cursor() - cursor.close() - - def test_run_query(self): - cursor = self.connection.cursor() - self.assert_cursor_works(cursor) - cursor.close() - - def test_run_bad_query(self): - cursor = self.connection.cursor() - try: - cursor.execute("garbage blah blah") - assert False - except AssertionError: - raise - except Exception: - pass - cursor.close() - - def test_put_none(self): - # the pool is of size 1, and its only connection is out - assert self.pool.free() == 0 - self.pool.put(None) - # ha ha we fooled it into thinking that we had a dead process - assert self.pool.free() == 1 - conn2 = self.pool.get() - assert conn2 is not None - assert conn2.cursor - self.pool.put(conn2) - - def test_close_does_a_put(self): - assert self.pool.free() == 0 - self.connection.close() - assert self.pool.free() == 1 - self.assertRaises(AttributeError, self.connection.cursor) - - @skipped - def test_deletion_does_a_put(self): - # doing a put on del causes some issues if __del__ is called in the - # main coroutine, so, not doing that for now - assert self.pool.free() == 0 - self.connection = None - assert self.pool.free() == 1 - - def test_put_doesnt_double_wrap(self): - self.pool.put(self.connection) - conn = self.pool.get() - assert not isinstance(conn._base, db_pool.PooledConnectionWrapper) - self.pool.put(conn) - - def test_bool(self): - assert self.connection - self.connection.close() - assert not self.connection - - def fill_up_table(self, conn): - curs = conn.cursor() - for i in six.moves.range(1000): - curs.execute('insert into test_table (value_int) values (%s)' % i) - conn.commit() - - def test_returns_immediately(self): - self.pool = self.create_pool() - conn = self.pool.get() - self.set_up_dummy_table(conn) - self.fill_up_table(conn) - curs = conn.cursor() - results = [] - SHORT_QUERY = "select * from test_table" - evt = event.Event() - - def a_query(): - self.assert_cursor_works(curs) - curs.execute(SHORT_QUERY) - results.append(2) - evt.send() - eventlet.spawn(a_query) - results.append(1) - self.assertEqual([1], results) - evt.wait() - self.assertEqual([1, 2], results) - self.pool.put(conn) - - def test_connection_is_clean_after_put(self): - self.pool = self.create_pool() - conn = self.pool.get() - self.set_up_dummy_table(conn) - curs = conn.cursor() - for i in range(10): - curs.execute('insert into test_table (value_int) values (%s)' % i) - # do not commit :-) - self.pool.put(conn) - del conn - conn2 = self.pool.get() - curs2 = conn2.cursor() - for i in range(10): - curs2.execute('insert into test_table (value_int) values (%s)' % i) - conn2.commit() - curs2.execute("select * from test_table") - # we should have only inserted them once - self.assertEqual(10, curs2.rowcount) - self.pool.put(conn2) - - def test_visibility_from_other_connections(self): - self.pool = self.create_pool(max_size=3) - conn = self.pool.get() - conn2 = self.pool.get() - curs = conn.cursor() - try: - curs2 = conn2.cursor() - curs2.execute("insert into gargleblatz (a) values (%s)" % (314159)) - self.assertEqual(curs2.rowcount, 1) - conn2.commit() - selection_query = "select * from gargleblatz" - curs2.execute(selection_query) - self.assertEqual(curs2.rowcount, 1) - del curs2 - self.pool.put(conn2) - # create a new connection, it should see the addition - conn3 = self.pool.get() - curs3 = conn3.cursor() - curs3.execute(selection_query) - self.assertEqual(curs3.rowcount, 1) - # now, does the already-open connection see it? - curs.execute(selection_query) - self.assertEqual(curs.rowcount, 1) - self.pool.put(conn3) - finally: - # clean up my litter - curs.execute("delete from gargleblatz where a=314159") - conn.commit() - self.pool.put(conn) - - @skipped - def test_two_simultaneous_connections(self): - # timing-sensitive test, disabled until we come up with a better - # way to do this - self.pool = self.create_pool(max_size=2) - conn = self.pool.get() - self.set_up_dummy_table(conn) - self.fill_up_table(conn) - curs = conn.cursor() - conn2 = self.pool.get() - self.set_up_dummy_table(conn2) - self.fill_up_table(conn2) - curs2 = conn2.cursor() - results = [] - LONG_QUERY = "select * from test_table" - SHORT_QUERY = "select * from test_table where row_id <= 20" - - evt = event.Event() - - def long_running_query(): - self.assert_cursor_works(curs) - curs.execute(LONG_QUERY) - results.append(1) - evt.send() - evt2 = event.Event() - - def short_running_query(): - self.assert_cursor_works(curs2) - curs2.execute(SHORT_QUERY) - results.append(2) - evt2.send() - - eventlet.spawn(long_running_query) - eventlet.spawn(short_running_query) - evt.wait() - evt2.wait() - results.sort() - self.assertEqual([1, 2], results) - - def test_clear(self): - self.pool = self.create_pool() - self.pool.put(self.connection) - self.pool.clear() - self.assertEqual(len(self.pool.free_items), 0) - - def test_clear_warmup(self): - """Clear implicitly created connections (min_size > 0)""" - self.pool = self.create_pool(min_size=1) - self.pool.clear() - self.assertEqual(len(self.pool.free_items), 0) - - def test_unwrap_connection(self): - self.assert_(isinstance(self.connection, - db_pool.GenericConnectionWrapper)) - conn = self.pool._unwrap_connection(self.connection) - assert not isinstance(conn, db_pool.GenericConnectionWrapper) - - self.assertEqual(None, self.pool._unwrap_connection(None)) - self.assertEqual(None, self.pool._unwrap_connection(1)) - - # testing duck typing here -- as long as the connection has a - # _base attribute, it should be unwrappable - x = Mock() - x._base = 'hi' - self.assertEqual('hi', self.pool._unwrap_connection(x)) - conn.close() - - def test_safe_close(self): - self.pool._safe_close(self.connection, quiet=True) - self.assertEqual(len(self.pool.free_items), 1) - - self.pool._safe_close(None) - self.pool._safe_close(1) - - # now we're really going for 100% coverage - x = Mock() - - def fail(): - raise KeyboardInterrupt() - x.close = fail - self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x) - - x = Mock() - - def fail2(): - raise RuntimeError("if this line has been printed, the test succeeded") - x.close = fail2 - self.pool._safe_close(x, quiet=False) - - def test_zero_max_idle(self): - self.pool.put(self.connection) - self.pool.clear() - self.pool = self.create_pool(max_size=2, max_idle=0) - self.connection = self.pool.get() - self.connection.close() - self.assertEqual(len(self.pool.free_items), 0) - - def test_zero_max_age(self): - self.pool.put(self.connection) - self.pool.clear() - self.pool = self.create_pool(max_size=2, max_age=0) - self.connection = self.pool.get() - self.connection.close() - self.assertEqual(len(self.pool.free_items), 0) - - @skipped - def test_max_idle(self): - # This test is timing-sensitive. Rename the function without - # the "dont" to run it, but beware that it could fail or take - # a while. - - self.pool = self.create_pool(max_size=2, max_idle=0.02) - self.connection = self.pool.get() - self.connection.close() - self.assertEqual(len(self.pool.free_items), 1) - eventlet.sleep(0.01) # not long enough to trigger the idle timeout - self.assertEqual(len(self.pool.free_items), 1) - self.connection = self.pool.get() - self.connection.close() - self.assertEqual(len(self.pool.free_items), 1) - eventlet.sleep(0.01) # idle timeout should have fired but done nothing - self.assertEqual(len(self.pool.free_items), 1) - self.connection = self.pool.get() - self.connection.close() - self.assertEqual(len(self.pool.free_items), 1) - eventlet.sleep(0.03) # long enough to trigger idle timeout for real - self.assertEqual(len(self.pool.free_items), 0) - - @skipped - def test_max_idle_many(self): - # This test is timing-sensitive. Rename the function without - # the "dont" to run it, but beware that it could fail or take - # a while. - - self.pool = self.create_pool(max_size=2, max_idle=0.02) - self.connection, conn2 = self.pool.get(), self.pool.get() - self.connection.close() - eventlet.sleep(0.01) - self.assertEqual(len(self.pool.free_items), 1) - conn2.close() - self.assertEqual(len(self.pool.free_items), 2) - eventlet.sleep(0.02) # trigger cleanup of conn1 but not conn2 - self.assertEqual(len(self.pool.free_items), 1) - - @skipped - def test_max_age(self): - # This test is timing-sensitive. Rename the function without - # the "dont" to run it, but beware that it could fail or take - # a while. - - self.pool = self.create_pool(max_size=2, max_age=0.05) - self.connection = self.pool.get() - self.connection.close() - self.assertEqual(len(self.pool.free_items), 1) - eventlet.sleep(0.01) # not long enough to trigger the age timeout - self.assertEqual(len(self.pool.free_items), 1) - self.connection = self.pool.get() - self.connection.close() - self.assertEqual(len(self.pool.free_items), 1) - eventlet.sleep(0.05) # long enough to trigger age timeout - self.assertEqual(len(self.pool.free_items), 0) - - @skipped - def test_max_age_many(self): - # This test is timing-sensitive. Rename the function without - # the "dont" to run it, but beware that it could fail or take - # a while. - - self.pool = self.create_pool(max_size=2, max_age=0.15) - self.connection, conn2 = self.pool.get(), self.pool.get() - self.connection.close() - self.assertEqual(len(self.pool.free_items), 1) - eventlet.sleep(0) # not long enough to trigger the age timeout - self.assertEqual(len(self.pool.free_items), 1) - eventlet.sleep(0.2) # long enough to trigger age timeout - self.assertEqual(len(self.pool.free_items), 0) - conn2.close() # should not be added to the free items - self.assertEqual(len(self.pool.free_items), 0) - - def test_waiters_get_woken(self): - # verify that when there's someone waiting on an empty pool - # and someone puts an immediately-closed connection back in - # the pool that the waiter gets woken - self.pool.put(self.connection) - self.pool.clear() - self.pool = self.create_pool(max_size=1, max_age=0) - - self.connection = self.pool.get() - self.assertEqual(self.pool.free(), 0) - self.assertEqual(self.pool.waiting(), 0) - e = event.Event() - - def retrieve(pool, ev): - c = pool.get() - ev.send(c) - eventlet.spawn(retrieve, self.pool, e) - eventlet.sleep(0) # these two sleeps should advance the retrieve - eventlet.sleep(0) # coroutine until it's waiting in get() - self.assertEqual(self.pool.free(), 0) - self.assertEqual(self.pool.waiting(), 1) - self.pool.put(self.connection) - timer = eventlet.Timeout(1) - conn = e.wait() - timer.cancel() - self.assertEqual(self.pool.free(), 0) - self.assertEqual(self.pool.waiting(), 0) - self.pool.put(conn) - - @skipped - def test_0_straight_benchmark(self): - """ Benchmark; don't run unless you want to wait a while.""" - import time - iterations = 20000 - c = self.connection.cursor() - self.connection.commit() - - def bench(c): - for i in six.moves.range(iterations): - c.execute('select 1') - - bench(c) # warm-up - results = [] - for i in range(3): - start = time.time() - bench(c) - end = time.time() - results.append(end - start) - - print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % ( - iterations, sum(results) / len(results), results, type(self))) - - def test_raising_create(self): - # if the create() method raises an exception the pool should - # not lose any connections - self.pool = self.create_pool(max_size=1, module=RaisingDBModule()) - self.assertRaises(RuntimeError, self.pool.get) - self.assertEqual(self.pool.free(), 1) - - -class DummyConnection(object): - def rollback(self): - pass - - -class DummyDBModule(object): - def connect(self, *args, **kwargs): - return DummyConnection() - - -class RaisingDBModule(object): - def connect(self, *args, **kw): - raise RuntimeError() - - -class TpoolConnectionPool(DBConnectionPool): - __test__ = False # so that nose doesn't try to execute this directly - - def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10, - connect_timeout=0.5, module=None): - if module is None: - module = self._dbmodule - return db_pool.TpooledConnectionPool( - module, - min_size=min_size, max_size=max_size, - max_idle=max_idle, max_age=max_age, - connect_timeout=connect_timeout, - **self._auth) - - @skip_with_pyevent - def setUp(self): - super(TpoolConnectionPool, self).setUp() - - def tearDown(self): - super(TpoolConnectionPool, self).tearDown() - from eventlet import tpool - tpool.killall() - - -class RawConnectionPool(DBConnectionPool): - __test__ = False # so that nose doesn't try to execute this directly - - def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10, - connect_timeout=0.5, module=None): - if module is None: - module = self._dbmodule - return db_pool.RawConnectionPool( - module, - min_size=min_size, max_size=max_size, - max_idle=max_idle, max_age=max_age, - connect_timeout=connect_timeout, - **self._auth) - - -def test_raw_pool_issue_125(): - # pool = self.create_pool(min_size=3, max_size=5) - pool = db_pool.RawConnectionPool( - DummyDBModule(), - dsn="dbname=test user=jessica port=5433", - min_size=3, max_size=5) - conn = pool.get() - pool.put(conn) - - -def test_raw_pool_custom_cleanup_ok(): - cleanup_mock = mock.Mock() - pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock) - conn = pool.get() - pool.put(conn) - assert cleanup_mock.call_count == 1 - - with pool.item() as conn: - pass - assert cleanup_mock.call_count == 2 - - -def test_raw_pool_custom_cleanup_arg_error(): - cleanup_mock = mock.Mock(side_effect=NotImplementedError) - pool = db_pool.RawConnectionPool(DummyDBModule()) - conn = pool.get() - pool.put(conn, cleanup=cleanup_mock) - assert cleanup_mock.call_count == 1 - - with pool.item(cleanup=cleanup_mock): - pass - assert cleanup_mock.call_count == 2 - - -def test_raw_pool_custom_cleanup_fatal(): - state = [0] - - def cleanup(conn): - state[0] += 1 - raise KeyboardInterrupt - - pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup) - conn = pool.get() - try: - pool.put(conn) - except KeyboardInterrupt: - pass - else: - assert False, 'Expected KeyboardInterrupt' - assert state[0] == 1 - - -def test_raw_pool_clear_update_current_size(): - # https://github.com/eventlet/eventlet/issues/139 - # BaseConnectionPool.clear does not update .current_size. - # That leads to situation when new connections could not be created. - pool = db_pool.RawConnectionPool(DummyDBModule()) - pool.get().close() - assert pool.current_size == 1 - assert len(pool.free_items) == 1 - pool.clear() - assert pool.current_size == 0 - assert len(pool.free_items) == 0 - - -get_auth = get_database_auth - - -def mysql_requirement(_f): - verbose = os.environ.get('eventlet_test_mysql_verbose') - try: - import MySQLdb - try: - auth = get_auth()['MySQLdb'].copy() - MySQLdb.connect(**auth) - return True - except MySQLdb.OperationalError: - if verbose: - print(">> Skipping mysql tests, error when connecting:", file=sys.stderr) - traceback.print_exc() - return False - except ImportError: - if verbose: - print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr) - return False - - -class MysqlConnectionPool(object): - dummy_table_sql = """CREATE TEMPORARY TABLE test_table - ( - row_id INTEGER PRIMARY KEY AUTO_INCREMENT, - value_int INTEGER, - value_float FLOAT, - value_string VARCHAR(200), - value_uuid CHAR(36), - value_binary BLOB, - value_binary_string VARCHAR(200) BINARY, - value_enum ENUM('Y','N'), - created TIMESTAMP - ) ENGINE=InnoDB;""" - - @skip_unless(mysql_requirement) - def setUp(self): - import MySQLdb - self._dbmodule = MySQLdb - self._auth = get_auth()['MySQLdb'] - super(MysqlConnectionPool, self).setUp() - - def tearDown(self): - super(MysqlConnectionPool, self).tearDown() - - def create_db(self): - auth = self._auth.copy() - try: - self.drop_db() - except Exception: - pass - dbname = 'test%s' % os.getpid() - db = self._dbmodule.connect(**auth).cursor() - db.execute("create database " + dbname) - db.close() - self._auth['db'] = dbname - del db - - def drop_db(self): - db = self._dbmodule.connect(**self._auth).cursor() - db.execute("drop database " + self._auth['db']) - db.close() - del db - - -class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase): - __test__ = True - - -class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase): - __test__ = True - - -def postgres_requirement(_f): - try: - import psycopg2 - try: - auth = get_auth()['psycopg2'].copy() - psycopg2.connect(**auth) - return True - except psycopg2.OperationalError: - print("Skipping postgres tests, error when connecting") - return False - except ImportError: - print("Skipping postgres tests, psycopg2 not importable") - return False - - -class Psycopg2ConnectionPool(object): - dummy_table_sql = """CREATE TEMPORARY TABLE test_table - ( - row_id SERIAL PRIMARY KEY, - value_int INTEGER, - value_float FLOAT, - value_string VARCHAR(200), - value_uuid CHAR(36), - value_binary BYTEA, - value_binary_string BYTEA, - created TIMESTAMP - );""" - - @skip_unless(postgres_requirement) - def setUp(self): - import psycopg2 - self._dbmodule = psycopg2 - self._auth = get_auth()['psycopg2'] - super(Psycopg2ConnectionPool, self).setUp() - - def tearDown(self): - super(Psycopg2ConnectionPool, self).tearDown() - - def create_db(self): - dbname = 'test%s' % os.getpid() - self._auth['database'] = dbname - try: - self.drop_db() - except Exception: - pass - auth = self._auth.copy() - auth.pop('database') # can't create if you're connecting to it - conn = self._dbmodule.connect(**auth) - conn.set_isolation_level(0) - db = conn.cursor() - db.execute("create database " + dbname) - db.close() - conn.close() - - def drop_db(self): - auth = self._auth.copy() - auth.pop('database') # can't drop database we connected to - conn = self._dbmodule.connect(**auth) - conn.set_isolation_level(0) - db = conn.cursor() - db.execute("drop database " + self._auth['database']) - db.close() - conn.close() - - -class TestPsycopg2Base(TestCase): - __test__ = False - - def test_cursor_works_as_context_manager(self): - with self.connection.cursor() as c: - c.execute('select 1') - row = c.fetchone() - assert row == (1,) - - -class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base): - __test__ = True - - -class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base): - __test__ = True - - -if __name__ == '__main__': - main()