--- /dev/null
+'''Test cases for db_pool
+'''
+from __future__ import print_function
+
+import sys
+import os
+import traceback
+from unittest import TestCase, main
+
+from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth
+from eventlet import event
+from eventlet import db_pool
+from eventlet.support import six
+import eventlet
+
+
+class DBTester(object):
+ __test__ = False # so that nose doesn't try to execute this directly
+
+ def setUp(self):
+ self.create_db()
+ self.connection = None
+ connection = self._dbmodule.connect(**self._auth)
+ cursor = connection.cursor()
+ cursor.execute("""CREATE TABLE gargleblatz
+ (
+ a INTEGER
+ );""")
+ connection.commit()
+ cursor.close()
+ connection.close()
+
+ def tearDown(self):
+ if self.connection:
+ self.connection.close()
+ self.drop_db()
+
+ def set_up_dummy_table(self, connection=None):
+ close_connection = False
+ if connection is None:
+ close_connection = True
+ if self.connection is None:
+ connection = self._dbmodule.connect(**self._auth)
+ else:
+ connection = self.connection
+
+ cursor = connection.cursor()
+ cursor.execute(self.dummy_table_sql)
+ connection.commit()
+ cursor.close()
+ if close_connection:
+ connection.close()
+
+
+# silly mock class
+class Mock(object):
+ pass
+
+
+class DBConnectionPool(DBTester):
+ __test__ = False # so that nose doesn't try to execute this directly
+
+ def setUp(self):
+ super(DBConnectionPool, self).setUp()
+ self.pool = self.create_pool()
+ self.connection = self.pool.get()
+
+ def tearDown(self):
+ if self.connection:
+ self.pool.put(self.connection)
+ self.pool.clear()
+ super(DBConnectionPool, self).tearDown()
+
+ def assert_cursor_works(self, cursor):
+ cursor.execute("select 1")
+ rows = cursor.fetchall()
+ assert rows
+
+ def test_connecting(self):
+ assert self.connection is not None
+
+ def test_create_cursor(self):
+ cursor = self.connection.cursor()
+ cursor.close()
+
+ def test_run_query(self):
+ cursor = self.connection.cursor()
+ self.assert_cursor_works(cursor)
+ cursor.close()
+
+ def test_run_bad_query(self):
+ cursor = self.connection.cursor()
+ try:
+ cursor.execute("garbage blah blah")
+ assert False
+ except AssertionError:
+ raise
+ except Exception:
+ pass
+ cursor.close()
+
+ def test_put_none(self):
+ # the pool is of size 1, and its only connection is out
+ assert self.pool.free() == 0
+ self.pool.put(None)
+ # ha ha we fooled it into thinking that we had a dead process
+ assert self.pool.free() == 1
+ conn2 = self.pool.get()
+ assert conn2 is not None
+ assert conn2.cursor
+ self.pool.put(conn2)
+
+ def test_close_does_a_put(self):
+ assert self.pool.free() == 0
+ self.connection.close()
+ assert self.pool.free() == 1
+ self.assertRaises(AttributeError, self.connection.cursor)
+
+ @skipped
+ def test_deletion_does_a_put(self):
+ # doing a put on del causes some issues if __del__ is called in the
+ # main coroutine, so, not doing that for now
+ assert self.pool.free() == 0
+ self.connection = None
+ assert self.pool.free() == 1
+
+ def test_put_doesnt_double_wrap(self):
+ self.pool.put(self.connection)
+ conn = self.pool.get()
+ assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
+ self.pool.put(conn)
+
+ def test_bool(self):
+ assert self.connection
+ self.connection.close()
+ assert not self.connection
+
+ def fill_up_table(self, conn):
+ curs = conn.cursor()
+ for i in six.moves.range(1000):
+ curs.execute('insert into test_table (value_int) values (%s)' % i)
+ conn.commit()
+
+ def test_returns_immediately(self):
+ self.pool = self.create_pool()
+ conn = self.pool.get()
+ self.set_up_dummy_table(conn)
+ self.fill_up_table(conn)
+ curs = conn.cursor()
+ results = []
+ SHORT_QUERY = "select * from test_table"
+ evt = event.Event()
+
+ def a_query():
+ self.assert_cursor_works(curs)
+ curs.execute(SHORT_QUERY)
+ results.append(2)
+ evt.send()
+ eventlet.spawn(a_query)
+ results.append(1)
+ self.assertEqual([1], results)
+ evt.wait()
+ self.assertEqual([1, 2], results)
+ self.pool.put(conn)
+
+ def test_connection_is_clean_after_put(self):
+ self.pool = self.create_pool()
+ conn = self.pool.get()
+ self.set_up_dummy_table(conn)
+ curs = conn.cursor()
+ for i in range(10):
+ curs.execute('insert into test_table (value_int) values (%s)' % i)
+ # do not commit :-)
+ self.pool.put(conn)
+ del conn
+ conn2 = self.pool.get()
+ curs2 = conn2.cursor()
+ for i in range(10):
+ curs2.execute('insert into test_table (value_int) values (%s)' % i)
+ conn2.commit()
+ curs2.execute("select * from test_table")
+ # we should have only inserted them once
+ self.assertEqual(10, curs2.rowcount)
+ self.pool.put(conn2)
+
+ def test_visibility_from_other_connections(self):
+ self.pool = self.create_pool(max_size=3)
+ conn = self.pool.get()
+ conn2 = self.pool.get()
+ curs = conn.cursor()
+ try:
+ curs2 = conn2.cursor()
+ curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
+ self.assertEqual(curs2.rowcount, 1)
+ conn2.commit()
+ selection_query = "select * from gargleblatz"
+ curs2.execute(selection_query)
+ self.assertEqual(curs2.rowcount, 1)
+ del curs2
+ self.pool.put(conn2)
+ # create a new connection, it should see the addition
+ conn3 = self.pool.get()
+ curs3 = conn3.cursor()
+ curs3.execute(selection_query)
+ self.assertEqual(curs3.rowcount, 1)
+ # now, does the already-open connection see it?
+ curs.execute(selection_query)
+ self.assertEqual(curs.rowcount, 1)
+ self.pool.put(conn3)
+ finally:
+ # clean up my litter
+ curs.execute("delete from gargleblatz where a=314159")
+ conn.commit()
+ self.pool.put(conn)
+
+ @skipped
+ def test_two_simultaneous_connections(self):
+ # timing-sensitive test, disabled until we come up with a better
+ # way to do this
+ self.pool = self.create_pool(max_size=2)
+ conn = self.pool.get()
+ self.set_up_dummy_table(conn)
+ self.fill_up_table(conn)
+ curs = conn.cursor()
+ conn2 = self.pool.get()
+ self.set_up_dummy_table(conn2)
+ self.fill_up_table(conn2)
+ curs2 = conn2.cursor()
+ results = []
+ LONG_QUERY = "select * from test_table"
+ SHORT_QUERY = "select * from test_table where row_id <= 20"
+
+ evt = event.Event()
+
+ def long_running_query():
+ self.assert_cursor_works(curs)
+ curs.execute(LONG_QUERY)
+ results.append(1)
+ evt.send()
+ evt2 = event.Event()
+
+ def short_running_query():
+ self.assert_cursor_works(curs2)
+ curs2.execute(SHORT_QUERY)
+ results.append(2)
+ evt2.send()
+
+ eventlet.spawn(long_running_query)
+ eventlet.spawn(short_running_query)
+ evt.wait()
+ evt2.wait()
+ results.sort()
+ self.assertEqual([1, 2], results)
+
+ def test_clear(self):
+ self.pool = self.create_pool()
+ self.pool.put(self.connection)
+ self.pool.clear()
+ self.assertEqual(len(self.pool.free_items), 0)
+
+ def test_clear_warmup(self):
+ """Clear implicitly created connections (min_size > 0)"""
+ self.pool = self.create_pool(min_size=1)
+ self.pool.clear()
+ self.assertEqual(len(self.pool.free_items), 0)
+
+ def test_unwrap_connection(self):
+ self.assert_(isinstance(self.connection,
+ db_pool.GenericConnectionWrapper))
+ conn = self.pool._unwrap_connection(self.connection)
+ assert not isinstance(conn, db_pool.GenericConnectionWrapper)
+
+ self.assertEqual(None, self.pool._unwrap_connection(None))
+ self.assertEqual(None, self.pool._unwrap_connection(1))
+
+ # testing duck typing here -- as long as the connection has a
+ # _base attribute, it should be unwrappable
+ x = Mock()
+ x._base = 'hi'
+ self.assertEqual('hi', self.pool._unwrap_connection(x))
+ conn.close()
+
+ def test_safe_close(self):
+ self.pool._safe_close(self.connection, quiet=True)
+ self.assertEqual(len(self.pool.free_items), 1)
+
+ self.pool._safe_close(None)
+ self.pool._safe_close(1)
+
+ # now we're really going for 100% coverage
+ x = Mock()
+
+ def fail():
+ raise KeyboardInterrupt()
+ x.close = fail
+ self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
+
+ x = Mock()
+
+ def fail2():
+ raise RuntimeError("if this line has been printed, the test succeeded")
+ x.close = fail2
+ self.pool._safe_close(x, quiet=False)
+
+ def test_zero_max_idle(self):
+ self.pool.put(self.connection)
+ self.pool.clear()
+ self.pool = self.create_pool(max_size=2, max_idle=0)
+ self.connection = self.pool.get()
+ self.connection.close()
+ self.assertEqual(len(self.pool.free_items), 0)
+
+ def test_zero_max_age(self):
+ self.pool.put(self.connection)
+ self.pool.clear()
+ self.pool = self.create_pool(max_size=2, max_age=0)
+ self.connection = self.pool.get()
+ self.connection.close()
+ self.assertEqual(len(self.pool.free_items), 0)
+
+ @skipped
+ def test_max_idle(self):
+ # This test is timing-sensitive. Rename the function without
+ # the "dont" to run it, but beware that it could fail or take
+ # a while.
+
+ self.pool = self.create_pool(max_size=2, max_idle=0.02)
+ self.connection = self.pool.get()
+ self.connection.close()
+ self.assertEqual(len(self.pool.free_items), 1)
+ eventlet.sleep(0.01) # not long enough to trigger the idle timeout
+ self.assertEqual(len(self.pool.free_items), 1)
+ self.connection = self.pool.get()
+ self.connection.close()
+ self.assertEqual(len(self.pool.free_items), 1)
+ eventlet.sleep(0.01) # idle timeout should have fired but done nothing
+ self.assertEqual(len(self.pool.free_items), 1)
+ self.connection = self.pool.get()
+ self.connection.close()
+ self.assertEqual(len(self.pool.free_items), 1)
+ eventlet.sleep(0.03) # long enough to trigger idle timeout for real
+ self.assertEqual(len(self.pool.free_items), 0)
+
+ @skipped
+ def test_max_idle_many(self):
+ # This test is timing-sensitive. Rename the function without
+ # the "dont" to run it, but beware that it could fail or take
+ # a while.
+
+ self.pool = self.create_pool(max_size=2, max_idle=0.02)
+ self.connection, conn2 = self.pool.get(), self.pool.get()
+ self.connection.close()
+ eventlet.sleep(0.01)
+ self.assertEqual(len(self.pool.free_items), 1)
+ conn2.close()
+ self.assertEqual(len(self.pool.free_items), 2)
+ eventlet.sleep(0.02) # trigger cleanup of conn1 but not conn2
+ self.assertEqual(len(self.pool.free_items), 1)
+
+ @skipped
+ def test_max_age(self):
+ # This test is timing-sensitive. Rename the function without
+ # the "dont" to run it, but beware that it could fail or take
+ # a while.
+
+ self.pool = self.create_pool(max_size=2, max_age=0.05)
+ self.connection = self.pool.get()
+ self.connection.close()
+ self.assertEqual(len(self.pool.free_items), 1)
+ eventlet.sleep(0.01) # not long enough to trigger the age timeout
+ self.assertEqual(len(self.pool.free_items), 1)
+ self.connection = self.pool.get()
+ self.connection.close()
+ self.assertEqual(len(self.pool.free_items), 1)
+ eventlet.sleep(0.05) # long enough to trigger age timeout
+ self.assertEqual(len(self.pool.free_items), 0)
+
+ @skipped
+ def test_max_age_many(self):
+ # This test is timing-sensitive. Rename the function without
+ # the "dont" to run it, but beware that it could fail or take
+ # a while.
+
+ self.pool = self.create_pool(max_size=2, max_age=0.15)
+ self.connection, conn2 = self.pool.get(), self.pool.get()
+ self.connection.close()
+ self.assertEqual(len(self.pool.free_items), 1)
+ eventlet.sleep(0) # not long enough to trigger the age timeout
+ self.assertEqual(len(self.pool.free_items), 1)
+ eventlet.sleep(0.2) # long enough to trigger age timeout
+ self.assertEqual(len(self.pool.free_items), 0)
+ conn2.close() # should not be added to the free items
+ self.assertEqual(len(self.pool.free_items), 0)
+
+ def test_waiters_get_woken(self):
+ # verify that when there's someone waiting on an empty pool
+ # and someone puts an immediately-closed connection back in
+ # the pool that the waiter gets woken
+ self.pool.put(self.connection)
+ self.pool.clear()
+ self.pool = self.create_pool(max_size=1, max_age=0)
+
+ self.connection = self.pool.get()
+ self.assertEqual(self.pool.free(), 0)
+ self.assertEqual(self.pool.waiting(), 0)
+ e = event.Event()
+
+ def retrieve(pool, ev):
+ c = pool.get()
+ ev.send(c)
+ eventlet.spawn(retrieve, self.pool, e)
+ eventlet.sleep(0) # these two sleeps should advance the retrieve
+ eventlet.sleep(0) # coroutine until it's waiting in get()
+ self.assertEqual(self.pool.free(), 0)
+ self.assertEqual(self.pool.waiting(), 1)
+ self.pool.put(self.connection)
+ timer = eventlet.Timeout(1)
+ conn = e.wait()
+ timer.cancel()
+ self.assertEqual(self.pool.free(), 0)
+ self.assertEqual(self.pool.waiting(), 0)
+ self.pool.put(conn)
+
+ @skipped
+ def test_0_straight_benchmark(self):
+ """ Benchmark; don't run unless you want to wait a while."""
+ import time
+ iterations = 20000
+ c = self.connection.cursor()
+ self.connection.commit()
+
+ def bench(c):
+ for i in six.moves.range(iterations):
+ c.execute('select 1')
+
+ bench(c) # warm-up
+ results = []
+ for i in range(3):
+ start = time.time()
+ bench(c)
+ end = time.time()
+ results.append(end - start)
+
+ print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
+ iterations, sum(results) / len(results), results, type(self)))
+
+ def test_raising_create(self):
+ # if the create() method raises an exception the pool should
+ # not lose any connections
+ self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
+ self.assertRaises(RuntimeError, self.pool.get)
+ self.assertEqual(self.pool.free(), 1)
+
+
+class DummyConnection(object):
+ def rollback(self):
+ pass
+
+
+class DummyDBModule(object):
+ def connect(self, *args, **kwargs):
+ return DummyConnection()
+
+
+class RaisingDBModule(object):
+ def connect(self, *args, **kw):
+ raise RuntimeError()
+
+
+class TpoolConnectionPool(DBConnectionPool):
+ __test__ = False # so that nose doesn't try to execute this directly
+
+ def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
+ connect_timeout=0.5, module=None):
+ if module is None:
+ module = self._dbmodule
+ return db_pool.TpooledConnectionPool(
+ module,
+ min_size=min_size, max_size=max_size,
+ max_idle=max_idle, max_age=max_age,
+ connect_timeout=connect_timeout,
+ **self._auth)
+
+ @skip_with_pyevent
+ def setUp(self):
+ super(TpoolConnectionPool, self).setUp()
+
+ def tearDown(self):
+ super(TpoolConnectionPool, self).tearDown()
+ from eventlet import tpool
+ tpool.killall()
+
+
+class RawConnectionPool(DBConnectionPool):
+ __test__ = False # so that nose doesn't try to execute this directly
+
+ def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
+ connect_timeout=0.5, module=None):
+ if module is None:
+ module = self._dbmodule
+ return db_pool.RawConnectionPool(
+ module,
+ min_size=min_size, max_size=max_size,
+ max_idle=max_idle, max_age=max_age,
+ connect_timeout=connect_timeout,
+ **self._auth)
+
+
+def test_raw_pool_issue_125():
+ # pool = self.create_pool(min_size=3, max_size=5)
+ pool = db_pool.RawConnectionPool(
+ DummyDBModule(),
+ dsn="dbname=test user=jessica port=5433",
+ min_size=3, max_size=5)
+ conn = pool.get()
+ pool.put(conn)
+
+
+def test_raw_pool_custom_cleanup_ok():
+ cleanup_mock = mock.Mock()
+ pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
+ conn = pool.get()
+ pool.put(conn)
+ assert cleanup_mock.call_count == 1
+
+ with pool.item() as conn:
+ pass
+ assert cleanup_mock.call_count == 2
+
+
+def test_raw_pool_custom_cleanup_arg_error():
+ cleanup_mock = mock.Mock(side_effect=NotImplementedError)
+ pool = db_pool.RawConnectionPool(DummyDBModule())
+ conn = pool.get()
+ pool.put(conn, cleanup=cleanup_mock)
+ assert cleanup_mock.call_count == 1
+
+ with pool.item(cleanup=cleanup_mock):
+ pass
+ assert cleanup_mock.call_count == 2
+
+
+def test_raw_pool_custom_cleanup_fatal():
+ state = [0]
+
+ def cleanup(conn):
+ state[0] += 1
+ raise KeyboardInterrupt
+
+ pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
+ conn = pool.get()
+ try:
+ pool.put(conn)
+ except KeyboardInterrupt:
+ pass
+ else:
+ assert False, 'Expected KeyboardInterrupt'
+ assert state[0] == 1
+
+
+def test_raw_pool_clear_update_current_size():
+ # https://github.com/eventlet/eventlet/issues/139
+ # BaseConnectionPool.clear does not update .current_size.
+ # That leads to situation when new connections could not be created.
+ pool = db_pool.RawConnectionPool(DummyDBModule())
+ pool.get().close()
+ assert pool.current_size == 1
+ assert len(pool.free_items) == 1
+ pool.clear()
+ assert pool.current_size == 0
+ assert len(pool.free_items) == 0
+
+
+get_auth = get_database_auth
+
+
+def mysql_requirement(_f):
+ verbose = os.environ.get('eventlet_test_mysql_verbose')
+ try:
+ import MySQLdb
+ try:
+ auth = get_auth()['MySQLdb'].copy()
+ MySQLdb.connect(**auth)
+ return True
+ except MySQLdb.OperationalError:
+ if verbose:
+ print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
+ traceback.print_exc()
+ return False
+ except ImportError:
+ if verbose:
+ print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
+ return False
+
+
+class MysqlConnectionPool(object):
+ dummy_table_sql = """CREATE TEMPORARY TABLE test_table
+ (
+ row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
+ value_int INTEGER,
+ value_float FLOAT,
+ value_string VARCHAR(200),
+ value_uuid CHAR(36),
+ value_binary BLOB,
+ value_binary_string VARCHAR(200) BINARY,
+ value_enum ENUM('Y','N'),
+ created TIMESTAMP
+ ) ENGINE=InnoDB;"""
+
+ @skip_unless(mysql_requirement)
+ def setUp(self):
+ import MySQLdb
+ self._dbmodule = MySQLdb
+ self._auth = get_auth()['MySQLdb']
+ super(MysqlConnectionPool, self).setUp()
+
+ def tearDown(self):
+ super(MysqlConnectionPool, self).tearDown()
+
+ def create_db(self):
+ auth = self._auth.copy()
+ try:
+ self.drop_db()
+ except Exception:
+ pass
+ dbname = 'test%s' % os.getpid()
+ db = self._dbmodule.connect(**auth).cursor()
+ db.execute("create database " + dbname)
+ db.close()
+ self._auth['db'] = dbname
+ del db
+
+ def drop_db(self):
+ db = self._dbmodule.connect(**self._auth).cursor()
+ db.execute("drop database " + self._auth['db'])
+ db.close()
+ del db
+
+
+class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
+ __test__ = True
+
+
+class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
+ __test__ = True
+
+
+def postgres_requirement(_f):
+ try:
+ import psycopg2
+ try:
+ auth = get_auth()['psycopg2'].copy()
+ psycopg2.connect(**auth)
+ return True
+ except psycopg2.OperationalError:
+ print("Skipping postgres tests, error when connecting")
+ return False
+ except ImportError:
+ print("Skipping postgres tests, psycopg2 not importable")
+ return False
+
+
+class Psycopg2ConnectionPool(object):
+ dummy_table_sql = """CREATE TEMPORARY TABLE test_table
+ (
+ row_id SERIAL PRIMARY KEY,
+ value_int INTEGER,
+ value_float FLOAT,
+ value_string VARCHAR(200),
+ value_uuid CHAR(36),
+ value_binary BYTEA,
+ value_binary_string BYTEA,
+ created TIMESTAMP
+ );"""
+
+ @skip_unless(postgres_requirement)
+ def setUp(self):
+ import psycopg2
+ self._dbmodule = psycopg2
+ self._auth = get_auth()['psycopg2']
+ super(Psycopg2ConnectionPool, self).setUp()
+
+ def tearDown(self):
+ super(Psycopg2ConnectionPool, self).tearDown()
+
+ def create_db(self):
+ dbname = 'test%s' % os.getpid()
+ self._auth['database'] = dbname
+ try:
+ self.drop_db()
+ except Exception:
+ pass
+ auth = self._auth.copy()
+ auth.pop('database') # can't create if you're connecting to it
+ conn = self._dbmodule.connect(**auth)
+ conn.set_isolation_level(0)
+ db = conn.cursor()
+ db.execute("create database " + dbname)
+ db.close()
+ conn.close()
+
+ def drop_db(self):
+ auth = self._auth.copy()
+ auth.pop('database') # can't drop database we connected to
+ conn = self._dbmodule.connect(**auth)
+ conn.set_isolation_level(0)
+ db = conn.cursor()
+ db.execute("drop database " + self._auth['database'])
+ db.close()
+ conn.close()
+
+
+class TestPsycopg2Base(TestCase):
+ __test__ = False
+
+ def test_cursor_works_as_context_manager(self):
+ with self.connection.cursor() as c:
+ c.execute('select 1')
+ row = c.fetchone()
+ assert row == (1,)
+
+
+class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
+ __test__ = True
+
+
+class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
+ __test__ = True
+
+
+if __name__ == '__main__':
+ main()