+++ /dev/null
-'''Test cases for db_pool
-'''
-from __future__ import print_function
-
-import sys
-import os
-import traceback
-from unittest import TestCase, main
-
-from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth
-from eventlet import event
-from eventlet import db_pool
-from eventlet.support import six
-import eventlet
-
-
-class DBTester(object):
- __test__ = False # so that nose doesn't try to execute this directly
-
- def setUp(self):
- self.create_db()
- self.connection = None
- connection = self._dbmodule.connect(**self._auth)
- cursor = connection.cursor()
- cursor.execute("""CREATE TABLE gargleblatz
- (
- a INTEGER
- );""")
- connection.commit()
- cursor.close()
- connection.close()
-
- def tearDown(self):
- if self.connection:
- self.connection.close()
- self.drop_db()
-
- def set_up_dummy_table(self, connection=None):
- close_connection = False
- if connection is None:
- close_connection = True
- if self.connection is None:
- connection = self._dbmodule.connect(**self._auth)
- else:
- connection = self.connection
-
- cursor = connection.cursor()
- cursor.execute(self.dummy_table_sql)
- connection.commit()
- cursor.close()
- if close_connection:
- connection.close()
-
-
-# silly mock class
-class Mock(object):
- pass
-
-
-class DBConnectionPool(DBTester):
- __test__ = False # so that nose doesn't try to execute this directly
-
- def setUp(self):
- super(DBConnectionPool, self).setUp()
- self.pool = self.create_pool()
- self.connection = self.pool.get()
-
- def tearDown(self):
- if self.connection:
- self.pool.put(self.connection)
- self.pool.clear()
- super(DBConnectionPool, self).tearDown()
-
- def assert_cursor_works(self, cursor):
- cursor.execute("select 1")
- rows = cursor.fetchall()
- assert rows
-
- def test_connecting(self):
- assert self.connection is not None
-
- def test_create_cursor(self):
- cursor = self.connection.cursor()
- cursor.close()
-
- def test_run_query(self):
- cursor = self.connection.cursor()
- self.assert_cursor_works(cursor)
- cursor.close()
-
- def test_run_bad_query(self):
- cursor = self.connection.cursor()
- try:
- cursor.execute("garbage blah blah")
- assert False
- except AssertionError:
- raise
- except Exception:
- pass
- cursor.close()
-
- def test_put_none(self):
- # the pool is of size 1, and its only connection is out
- assert self.pool.free() == 0
- self.pool.put(None)
- # ha ha we fooled it into thinking that we had a dead process
- assert self.pool.free() == 1
- conn2 = self.pool.get()
- assert conn2 is not None
- assert conn2.cursor
- self.pool.put(conn2)
-
- def test_close_does_a_put(self):
- assert self.pool.free() == 0
- self.connection.close()
- assert self.pool.free() == 1
- self.assertRaises(AttributeError, self.connection.cursor)
-
- @skipped
- def test_deletion_does_a_put(self):
- # doing a put on del causes some issues if __del__ is called in the
- # main coroutine, so, not doing that for now
- assert self.pool.free() == 0
- self.connection = None
- assert self.pool.free() == 1
-
- def test_put_doesnt_double_wrap(self):
- self.pool.put(self.connection)
- conn = self.pool.get()
- assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
- self.pool.put(conn)
-
- def test_bool(self):
- assert self.connection
- self.connection.close()
- assert not self.connection
-
- def fill_up_table(self, conn):
- curs = conn.cursor()
- for i in six.moves.range(1000):
- curs.execute('insert into test_table (value_int) values (%s)' % i)
- conn.commit()
-
- def test_returns_immediately(self):
- self.pool = self.create_pool()
- conn = self.pool.get()
- self.set_up_dummy_table(conn)
- self.fill_up_table(conn)
- curs = conn.cursor()
- results = []
- SHORT_QUERY = "select * from test_table"
- evt = event.Event()
-
- def a_query():
- self.assert_cursor_works(curs)
- curs.execute(SHORT_QUERY)
- results.append(2)
- evt.send()
- eventlet.spawn(a_query)
- results.append(1)
- self.assertEqual([1], results)
- evt.wait()
- self.assertEqual([1, 2], results)
- self.pool.put(conn)
-
- def test_connection_is_clean_after_put(self):
- self.pool = self.create_pool()
- conn = self.pool.get()
- self.set_up_dummy_table(conn)
- curs = conn.cursor()
- for i in range(10):
- curs.execute('insert into test_table (value_int) values (%s)' % i)
- # do not commit :-)
- self.pool.put(conn)
- del conn
- conn2 = self.pool.get()
- curs2 = conn2.cursor()
- for i in range(10):
- curs2.execute('insert into test_table (value_int) values (%s)' % i)
- conn2.commit()
- curs2.execute("select * from test_table")
- # we should have only inserted them once
- self.assertEqual(10, curs2.rowcount)
- self.pool.put(conn2)
-
- def test_visibility_from_other_connections(self):
- self.pool = self.create_pool(max_size=3)
- conn = self.pool.get()
- conn2 = self.pool.get()
- curs = conn.cursor()
- try:
- curs2 = conn2.cursor()
- curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
- self.assertEqual(curs2.rowcount, 1)
- conn2.commit()
- selection_query = "select * from gargleblatz"
- curs2.execute(selection_query)
- self.assertEqual(curs2.rowcount, 1)
- del curs2
- self.pool.put(conn2)
- # create a new connection, it should see the addition
- conn3 = self.pool.get()
- curs3 = conn3.cursor()
- curs3.execute(selection_query)
- self.assertEqual(curs3.rowcount, 1)
- # now, does the already-open connection see it?
- curs.execute(selection_query)
- self.assertEqual(curs.rowcount, 1)
- self.pool.put(conn3)
- finally:
- # clean up my litter
- curs.execute("delete from gargleblatz where a=314159")
- conn.commit()
- self.pool.put(conn)
-
- @skipped
- def test_two_simultaneous_connections(self):
- # timing-sensitive test, disabled until we come up with a better
- # way to do this
- self.pool = self.create_pool(max_size=2)
- conn = self.pool.get()
- self.set_up_dummy_table(conn)
- self.fill_up_table(conn)
- curs = conn.cursor()
- conn2 = self.pool.get()
- self.set_up_dummy_table(conn2)
- self.fill_up_table(conn2)
- curs2 = conn2.cursor()
- results = []
- LONG_QUERY = "select * from test_table"
- SHORT_QUERY = "select * from test_table where row_id <= 20"
-
- evt = event.Event()
-
- def long_running_query():
- self.assert_cursor_works(curs)
- curs.execute(LONG_QUERY)
- results.append(1)
- evt.send()
- evt2 = event.Event()
-
- def short_running_query():
- self.assert_cursor_works(curs2)
- curs2.execute(SHORT_QUERY)
- results.append(2)
- evt2.send()
-
- eventlet.spawn(long_running_query)
- eventlet.spawn(short_running_query)
- evt.wait()
- evt2.wait()
- results.sort()
- self.assertEqual([1, 2], results)
-
- def test_clear(self):
- self.pool = self.create_pool()
- self.pool.put(self.connection)
- self.pool.clear()
- self.assertEqual(len(self.pool.free_items), 0)
-
- def test_clear_warmup(self):
- """Clear implicitly created connections (min_size > 0)"""
- self.pool = self.create_pool(min_size=1)
- self.pool.clear()
- self.assertEqual(len(self.pool.free_items), 0)
-
- def test_unwrap_connection(self):
- self.assert_(isinstance(self.connection,
- db_pool.GenericConnectionWrapper))
- conn = self.pool._unwrap_connection(self.connection)
- assert not isinstance(conn, db_pool.GenericConnectionWrapper)
-
- self.assertEqual(None, self.pool._unwrap_connection(None))
- self.assertEqual(None, self.pool._unwrap_connection(1))
-
- # testing duck typing here -- as long as the connection has a
- # _base attribute, it should be unwrappable
- x = Mock()
- x._base = 'hi'
- self.assertEqual('hi', self.pool._unwrap_connection(x))
- conn.close()
-
- def test_safe_close(self):
- self.pool._safe_close(self.connection, quiet=True)
- self.assertEqual(len(self.pool.free_items), 1)
-
- self.pool._safe_close(None)
- self.pool._safe_close(1)
-
- # now we're really going for 100% coverage
- x = Mock()
-
- def fail():
- raise KeyboardInterrupt()
- x.close = fail
- self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
-
- x = Mock()
-
- def fail2():
- raise RuntimeError("if this line has been printed, the test succeeded")
- x.close = fail2
- self.pool._safe_close(x, quiet=False)
-
- def test_zero_max_idle(self):
- self.pool.put(self.connection)
- self.pool.clear()
- self.pool = self.create_pool(max_size=2, max_idle=0)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 0)
-
- def test_zero_max_age(self):
- self.pool.put(self.connection)
- self.pool.clear()
- self.pool = self.create_pool(max_size=2, max_age=0)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 0)
-
- @skipped
- def test_max_idle(self):
- # This test is timing-sensitive. Rename the function without
- # the "dont" to run it, but beware that it could fail or take
- # a while.
-
- self.pool = self.create_pool(max_size=2, max_idle=0.02)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.01) # not long enough to trigger the idle timeout
- self.assertEqual(len(self.pool.free_items), 1)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.01) # idle timeout should have fired but done nothing
- self.assertEqual(len(self.pool.free_items), 1)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.03) # long enough to trigger idle timeout for real
- self.assertEqual(len(self.pool.free_items), 0)
-
- @skipped
- def test_max_idle_many(self):
- # This test is timing-sensitive. Rename the function without
- # the "dont" to run it, but beware that it could fail or take
- # a while.
-
- self.pool = self.create_pool(max_size=2, max_idle=0.02)
- self.connection, conn2 = self.pool.get(), self.pool.get()
- self.connection.close()
- eventlet.sleep(0.01)
- self.assertEqual(len(self.pool.free_items), 1)
- conn2.close()
- self.assertEqual(len(self.pool.free_items), 2)
- eventlet.sleep(0.02) # trigger cleanup of conn1 but not conn2
- self.assertEqual(len(self.pool.free_items), 1)
-
- @skipped
- def test_max_age(self):
- # This test is timing-sensitive. Rename the function without
- # the "dont" to run it, but beware that it could fail or take
- # a while.
-
- self.pool = self.create_pool(max_size=2, max_age=0.05)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.01) # not long enough to trigger the age timeout
- self.assertEqual(len(self.pool.free_items), 1)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.05) # long enough to trigger age timeout
- self.assertEqual(len(self.pool.free_items), 0)
-
- @skipped
- def test_max_age_many(self):
- # This test is timing-sensitive. Rename the function without
- # the "dont" to run it, but beware that it could fail or take
- # a while.
-
- self.pool = self.create_pool(max_size=2, max_age=0.15)
- self.connection, conn2 = self.pool.get(), self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0) # not long enough to trigger the age timeout
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.2) # long enough to trigger age timeout
- self.assertEqual(len(self.pool.free_items), 0)
- conn2.close() # should not be added to the free items
- self.assertEqual(len(self.pool.free_items), 0)
-
- def test_waiters_get_woken(self):
- # verify that when there's someone waiting on an empty pool
- # and someone puts an immediately-closed connection back in
- # the pool that the waiter gets woken
- self.pool.put(self.connection)
- self.pool.clear()
- self.pool = self.create_pool(max_size=1, max_age=0)
-
- self.connection = self.pool.get()
- self.assertEqual(self.pool.free(), 0)
- self.assertEqual(self.pool.waiting(), 0)
- e = event.Event()
-
- def retrieve(pool, ev):
- c = pool.get()
- ev.send(c)
- eventlet.spawn(retrieve, self.pool, e)
- eventlet.sleep(0) # these two sleeps should advance the retrieve
- eventlet.sleep(0) # coroutine until it's waiting in get()
- self.assertEqual(self.pool.free(), 0)
- self.assertEqual(self.pool.waiting(), 1)
- self.pool.put(self.connection)
- timer = eventlet.Timeout(1)
- conn = e.wait()
- timer.cancel()
- self.assertEqual(self.pool.free(), 0)
- self.assertEqual(self.pool.waiting(), 0)
- self.pool.put(conn)
-
- @skipped
- def test_0_straight_benchmark(self):
- """ Benchmark; don't run unless you want to wait a while."""
- import time
- iterations = 20000
- c = self.connection.cursor()
- self.connection.commit()
-
- def bench(c):
- for i in six.moves.range(iterations):
- c.execute('select 1')
-
- bench(c) # warm-up
- results = []
- for i in range(3):
- start = time.time()
- bench(c)
- end = time.time()
- results.append(end - start)
-
- print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
- iterations, sum(results) / len(results), results, type(self)))
-
- def test_raising_create(self):
- # if the create() method raises an exception the pool should
- # not lose any connections
- self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
- self.assertRaises(RuntimeError, self.pool.get)
- self.assertEqual(self.pool.free(), 1)
-
-
-class DummyConnection(object):
- def rollback(self):
- pass
-
-
-class DummyDBModule(object):
- def connect(self, *args, **kwargs):
- return DummyConnection()
-
-
-class RaisingDBModule(object):
- def connect(self, *args, **kw):
- raise RuntimeError()
-
-
-class TpoolConnectionPool(DBConnectionPool):
- __test__ = False # so that nose doesn't try to execute this directly
-
- def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
- connect_timeout=0.5, module=None):
- if module is None:
- module = self._dbmodule
- return db_pool.TpooledConnectionPool(
- module,
- min_size=min_size, max_size=max_size,
- max_idle=max_idle, max_age=max_age,
- connect_timeout=connect_timeout,
- **self._auth)
-
- @skip_with_pyevent
- def setUp(self):
- super(TpoolConnectionPool, self).setUp()
-
- def tearDown(self):
- super(TpoolConnectionPool, self).tearDown()
- from eventlet import tpool
- tpool.killall()
-
-
-class RawConnectionPool(DBConnectionPool):
- __test__ = False # so that nose doesn't try to execute this directly
-
- def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
- connect_timeout=0.5, module=None):
- if module is None:
- module = self._dbmodule
- return db_pool.RawConnectionPool(
- module,
- min_size=min_size, max_size=max_size,
- max_idle=max_idle, max_age=max_age,
- connect_timeout=connect_timeout,
- **self._auth)
-
-
-def test_raw_pool_issue_125():
- # pool = self.create_pool(min_size=3, max_size=5)
- pool = db_pool.RawConnectionPool(
- DummyDBModule(),
- dsn="dbname=test user=jessica port=5433",
- min_size=3, max_size=5)
- conn = pool.get()
- pool.put(conn)
-
-
-def test_raw_pool_custom_cleanup_ok():
- cleanup_mock = mock.Mock()
- pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
- conn = pool.get()
- pool.put(conn)
- assert cleanup_mock.call_count == 1
-
- with pool.item() as conn:
- pass
- assert cleanup_mock.call_count == 2
-
-
-def test_raw_pool_custom_cleanup_arg_error():
- cleanup_mock = mock.Mock(side_effect=NotImplementedError)
- pool = db_pool.RawConnectionPool(DummyDBModule())
- conn = pool.get()
- pool.put(conn, cleanup=cleanup_mock)
- assert cleanup_mock.call_count == 1
-
- with pool.item(cleanup=cleanup_mock):
- pass
- assert cleanup_mock.call_count == 2
-
-
-def test_raw_pool_custom_cleanup_fatal():
- state = [0]
-
- def cleanup(conn):
- state[0] += 1
- raise KeyboardInterrupt
-
- pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
- conn = pool.get()
- try:
- pool.put(conn)
- except KeyboardInterrupt:
- pass
- else:
- assert False, 'Expected KeyboardInterrupt'
- assert state[0] == 1
-
-
-def test_raw_pool_clear_update_current_size():
- # https://github.com/eventlet/eventlet/issues/139
- # BaseConnectionPool.clear does not update .current_size.
- # That leads to situation when new connections could not be created.
- pool = db_pool.RawConnectionPool(DummyDBModule())
- pool.get().close()
- assert pool.current_size == 1
- assert len(pool.free_items) == 1
- pool.clear()
- assert pool.current_size == 0
- assert len(pool.free_items) == 0
-
-
-get_auth = get_database_auth
-
-
-def mysql_requirement(_f):
- verbose = os.environ.get('eventlet_test_mysql_verbose')
- try:
- import MySQLdb
- try:
- auth = get_auth()['MySQLdb'].copy()
- MySQLdb.connect(**auth)
- return True
- except MySQLdb.OperationalError:
- if verbose:
- print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
- traceback.print_exc()
- return False
- except ImportError:
- if verbose:
- print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
- return False
-
-
-class MysqlConnectionPool(object):
- dummy_table_sql = """CREATE TEMPORARY TABLE test_table
- (
- row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
- value_int INTEGER,
- value_float FLOAT,
- value_string VARCHAR(200),
- value_uuid CHAR(36),
- value_binary BLOB,
- value_binary_string VARCHAR(200) BINARY,
- value_enum ENUM('Y','N'),
- created TIMESTAMP
- ) ENGINE=InnoDB;"""
-
- @skip_unless(mysql_requirement)
- def setUp(self):
- import MySQLdb
- self._dbmodule = MySQLdb
- self._auth = get_auth()['MySQLdb']
- super(MysqlConnectionPool, self).setUp()
-
- def tearDown(self):
- super(MysqlConnectionPool, self).tearDown()
-
- def create_db(self):
- auth = self._auth.copy()
- try:
- self.drop_db()
- except Exception:
- pass
- dbname = 'test%s' % os.getpid()
- db = self._dbmodule.connect(**auth).cursor()
- db.execute("create database " + dbname)
- db.close()
- self._auth['db'] = dbname
- del db
-
- def drop_db(self):
- db = self._dbmodule.connect(**self._auth).cursor()
- db.execute("drop database " + self._auth['db'])
- db.close()
- del db
-
-
-class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
- __test__ = True
-
-
-class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
- __test__ = True
-
-
-def postgres_requirement(_f):
- try:
- import psycopg2
- try:
- auth = get_auth()['psycopg2'].copy()
- psycopg2.connect(**auth)
- return True
- except psycopg2.OperationalError:
- print("Skipping postgres tests, error when connecting")
- return False
- except ImportError:
- print("Skipping postgres tests, psycopg2 not importable")
- return False
-
-
-class Psycopg2ConnectionPool(object):
- dummy_table_sql = """CREATE TEMPORARY TABLE test_table
- (
- row_id SERIAL PRIMARY KEY,
- value_int INTEGER,
- value_float FLOAT,
- value_string VARCHAR(200),
- value_uuid CHAR(36),
- value_binary BYTEA,
- value_binary_string BYTEA,
- created TIMESTAMP
- );"""
-
- @skip_unless(postgres_requirement)
- def setUp(self):
- import psycopg2
- self._dbmodule = psycopg2
- self._auth = get_auth()['psycopg2']
- super(Psycopg2ConnectionPool, self).setUp()
-
- def tearDown(self):
- super(Psycopg2ConnectionPool, self).tearDown()
-
- def create_db(self):
- dbname = 'test%s' % os.getpid()
- self._auth['database'] = dbname
- try:
- self.drop_db()
- except Exception:
- pass
- auth = self._auth.copy()
- auth.pop('database') # can't create if you're connecting to it
- conn = self._dbmodule.connect(**auth)
- conn.set_isolation_level(0)
- db = conn.cursor()
- db.execute("create database " + dbname)
- db.close()
- conn.close()
-
- def drop_db(self):
- auth = self._auth.copy()
- auth.pop('database') # can't drop database we connected to
- conn = self._dbmodule.connect(**auth)
- conn.set_isolation_level(0)
- db = conn.cursor()
- db.execute("drop database " + self._auth['database'])
- db.close()
- conn.close()
-
-
-class TestPsycopg2Base(TestCase):
- __test__ = False
-
- def test_cursor_works_as_context_manager(self):
- with self.connection.cursor() as c:
- c.execute('select 1')
- row = c.fetchone()
- assert row == (1,)
-
-
-class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
- __test__ = True
-
-
-class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
- __test__ = True
-
-
-if __name__ == '__main__':
- main()