1 '''Test cases for db_pool
3 from __future__ import print_function
8 from unittest import TestCase, main
10 from tests import mock, skip_unless, skip_with_pyevent, get_database_auth
11 from eventlet import event
12 from eventlet import db_pool
13 from eventlet.support import six
17 class DBTester(object):
18 __test__ = False # so that nose doesn't try to execute this directly
22 self.connection = None
23 connection = self._dbmodule.connect(**self._auth)
24 cursor = connection.cursor()
25 cursor.execute("""CREATE TABLE gargleblatz
35 self.connection.close()
38 def set_up_dummy_table(self, connection=None):
39 close_connection = False
40 if connection is None:
41 close_connection = True
42 if self.connection is None:
43 connection = self._dbmodule.connect(**self._auth)
45 connection = self.connection
47 cursor = connection.cursor()
48 cursor.execute(self.dummy_table_sql)
60 class DBConnectionPool(DBTester):
61 __test__ = False # so that nose doesn't try to execute this directly
64 super(DBConnectionPool, self).setUp()
65 self.pool = self.create_pool()
66 self.connection = self.pool.get()
70 self.pool.put(self.connection)
72 super(DBConnectionPool, self).tearDown()
74 def assert_cursor_works(self, cursor):
75 cursor.execute("select 1")
76 rows = cursor.fetchall()
79 def test_connecting(self):
80 assert self.connection is not None
82 def test_create_cursor(self):
83 cursor = self.connection.cursor()
86 def test_run_query(self):
87 cursor = self.connection.cursor()
88 self.assert_cursor_works(cursor)
91 def test_run_bad_query(self):
92 cursor = self.connection.cursor()
94 cursor.execute("garbage blah blah")
96 except AssertionError:
102 def test_put_none(self):
103 # the pool is of size 1, and its only connection is out
104 assert self.pool.free() == 0
106 # ha ha we fooled it into thinking that we had a dead process
107 assert self.pool.free() == 1
108 conn2 = self.pool.get()
109 assert conn2 is not None
113 def test_close_does_a_put(self):
114 assert self.pool.free() == 0
115 self.connection.close()
116 assert self.pool.free() == 1
117 self.assertRaises(AttributeError, self.connection.cursor)
119 def test_put_doesnt_double_wrap(self):
120 self.pool.put(self.connection)
121 conn = self.pool.get()
122 assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
126 assert self.connection
127 self.connection.close()
128 assert not self.connection
130 def fill_up_table(self, conn):
132 for i in six.moves.range(1000):
133 curs.execute('insert into test_table (value_int) values (%s)' % i)
136 def test_returns_immediately(self):
137 self.pool = self.create_pool()
138 conn = self.pool.get()
139 self.set_up_dummy_table(conn)
140 self.fill_up_table(conn)
143 SHORT_QUERY = "select * from test_table"
147 self.assert_cursor_works(curs)
148 curs.execute(SHORT_QUERY)
151 eventlet.spawn(a_query)
153 self.assertEqual([1], results)
155 self.assertEqual([1, 2], results)
158 def test_connection_is_clean_after_put(self):
159 self.pool = self.create_pool()
160 conn = self.pool.get()
161 self.set_up_dummy_table(conn)
164 curs.execute('insert into test_table (value_int) values (%s)' % i)
168 conn2 = self.pool.get()
169 curs2 = conn2.cursor()
171 curs2.execute('insert into test_table (value_int) values (%s)' % i)
173 curs2.execute("select * from test_table")
174 # we should have only inserted them once
175 self.assertEqual(10, curs2.rowcount)
178 def test_visibility_from_other_connections(self):
179 self.pool = self.create_pool(max_size=3)
180 conn = self.pool.get()
181 conn2 = self.pool.get()
184 curs2 = conn2.cursor()
185 curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
186 self.assertEqual(curs2.rowcount, 1)
188 selection_query = "select * from gargleblatz"
189 curs2.execute(selection_query)
190 self.assertEqual(curs2.rowcount, 1)
193 # create a new connection, it should see the addition
194 conn3 = self.pool.get()
195 curs3 = conn3.cursor()
196 curs3.execute(selection_query)
197 self.assertEqual(curs3.rowcount, 1)
198 # now, does the already-open connection see it?
199 curs.execute(selection_query)
200 self.assertEqual(curs.rowcount, 1)
204 curs.execute("delete from gargleblatz where a=314159")
208 def test_clear(self):
209 self.pool = self.create_pool()
210 self.pool.put(self.connection)
212 self.assertEqual(len(self.pool.free_items), 0)
214 def test_clear_warmup(self):
215 """Clear implicitly created connections (min_size > 0)"""
216 self.pool = self.create_pool(min_size=1)
218 self.assertEqual(len(self.pool.free_items), 0)
220 def test_unwrap_connection(self):
221 self.assert_(isinstance(self.connection,
222 db_pool.GenericConnectionWrapper))
223 conn = self.pool._unwrap_connection(self.connection)
224 assert not isinstance(conn, db_pool.GenericConnectionWrapper)
226 self.assertEqual(None, self.pool._unwrap_connection(None))
227 self.assertEqual(None, self.pool._unwrap_connection(1))
229 # testing duck typing here -- as long as the connection has a
230 # _base attribute, it should be unwrappable
233 self.assertEqual('hi', self.pool._unwrap_connection(x))
236 def test_safe_close(self):
237 self.pool._safe_close(self.connection, quiet=True)
238 self.assertEqual(len(self.pool.free_items), 1)
240 self.pool._safe_close(None)
241 self.pool._safe_close(1)
243 # now we're really going for 100% coverage
247 raise KeyboardInterrupt()
249 self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
254 raise RuntimeError("if this line has been printed, the test succeeded")
256 self.pool._safe_close(x, quiet=False)
258 def test_zero_max_idle(self):
259 self.pool.put(self.connection)
261 self.pool = self.create_pool(max_size=2, max_idle=0)
262 self.connection = self.pool.get()
263 self.connection.close()
264 self.assertEqual(len(self.pool.free_items), 0)
266 def test_zero_max_age(self):
267 self.pool.put(self.connection)
269 self.pool = self.create_pool(max_size=2, max_age=0)
270 self.connection = self.pool.get()
271 self.connection.close()
272 self.assertEqual(len(self.pool.free_items), 0)
274 def test_waiters_get_woken(self):
275 # verify that when there's someone waiting on an empty pool
276 # and someone puts an immediately-closed connection back in
277 # the pool that the waiter gets woken
278 self.pool.put(self.connection)
280 self.pool = self.create_pool(max_size=1, max_age=0)
282 self.connection = self.pool.get()
283 self.assertEqual(self.pool.free(), 0)
284 self.assertEqual(self.pool.waiting(), 0)
287 def retrieve(pool, ev):
290 eventlet.spawn(retrieve, self.pool, e)
291 eventlet.sleep(0) # these two sleeps should advance the retrieve
292 eventlet.sleep(0) # coroutine until it's waiting in get()
293 self.assertEqual(self.pool.free(), 0)
294 self.assertEqual(self.pool.waiting(), 1)
295 self.pool.put(self.connection)
296 timer = eventlet.Timeout(1)
299 self.assertEqual(self.pool.free(), 0)
300 self.assertEqual(self.pool.waiting(), 0)
303 def test_raising_create(self):
304 # if the create() method raises an exception the pool should
305 # not lose any connections
306 self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
307 self.assertRaises(RuntimeError, self.pool.get)
308 self.assertEqual(self.pool.free(), 1)
311 class DummyConnection(object):
316 class DummyDBModule(object):
317 def connect(self, *args, **kwargs):
318 return DummyConnection()
321 class RaisingDBModule(object):
322 def connect(self, *args, **kw):
326 class TpoolConnectionPool(DBConnectionPool):
327 __test__ = False # so that nose doesn't try to execute this directly
329 def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
330 connect_timeout=0.5, module=None):
332 module = self._dbmodule
333 return db_pool.TpooledConnectionPool(
335 min_size=min_size, max_size=max_size,
336 max_idle=max_idle, max_age=max_age,
337 connect_timeout=connect_timeout,
342 super(TpoolConnectionPool, self).setUp()
345 super(TpoolConnectionPool, self).tearDown()
346 from eventlet import tpool
350 class RawConnectionPool(DBConnectionPool):
351 __test__ = False # so that nose doesn't try to execute this directly
353 def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
354 connect_timeout=0.5, module=None):
356 module = self._dbmodule
357 return db_pool.RawConnectionPool(
359 min_size=min_size, max_size=max_size,
360 max_idle=max_idle, max_age=max_age,
361 connect_timeout=connect_timeout,
365 def test_raw_pool_issue_125():
366 # pool = self.create_pool(min_size=3, max_size=5)
367 pool = db_pool.RawConnectionPool(
369 dsn="dbname=test user=jessica port=5433",
370 min_size=3, max_size=5)
375 def test_raw_pool_custom_cleanup_ok():
376 cleanup_mock = mock.Mock()
377 pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
380 assert cleanup_mock.call_count == 1
382 with pool.item() as conn:
384 assert cleanup_mock.call_count == 2
387 def test_raw_pool_custom_cleanup_arg_error():
388 cleanup_mock = mock.Mock(side_effect=NotImplementedError)
389 pool = db_pool.RawConnectionPool(DummyDBModule())
391 pool.put(conn, cleanup=cleanup_mock)
392 assert cleanup_mock.call_count == 1
394 with pool.item(cleanup=cleanup_mock):
396 assert cleanup_mock.call_count == 2
399 def test_raw_pool_custom_cleanup_fatal():
404 raise KeyboardInterrupt
406 pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
410 except KeyboardInterrupt:
413 assert False, 'Expected KeyboardInterrupt'
417 def test_raw_pool_clear_update_current_size():
418 # https://github.com/eventlet/eventlet/issues/139
419 # BaseConnectionPool.clear does not update .current_size.
420 # That leads to situation when new connections could not be created.
421 pool = db_pool.RawConnectionPool(DummyDBModule())
423 assert pool.current_size == 1
424 assert len(pool.free_items) == 1
426 assert pool.current_size == 0
427 assert len(pool.free_items) == 0
430 get_auth = get_database_auth
433 def mysql_requirement(_f):
434 verbose = os.environ.get('eventlet_test_mysql_verbose')
438 auth = get_auth()['MySQLdb'].copy()
439 MySQLdb.connect(**auth)
441 except MySQLdb.OperationalError:
443 print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
444 traceback.print_exc()
448 print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
452 class MysqlConnectionPool(object):
453 dummy_table_sql = """CREATE TEMPORARY TABLE test_table
455 row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
458 value_string VARCHAR(200),
461 value_binary_string VARCHAR(200) BINARY,
462 value_enum ENUM('Y','N'),
466 @skip_unless(mysql_requirement)
469 self._dbmodule = MySQLdb
470 self._auth = get_auth()['MySQLdb']
471 super(MysqlConnectionPool, self).setUp()
474 super(MysqlConnectionPool, self).tearDown()
477 auth = self._auth.copy()
482 dbname = 'test%s' % os.getpid()
483 db = self._dbmodule.connect(**auth).cursor()
484 db.execute("create database " + dbname)
486 self._auth['db'] = dbname
490 db = self._dbmodule.connect(**self._auth).cursor()
491 db.execute("drop database " + self._auth['db'])
496 class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
500 class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
504 def postgres_requirement(_f):
508 auth = get_auth()['psycopg2'].copy()
509 psycopg2.connect(**auth)
511 except psycopg2.OperationalError:
512 print("Skipping postgres tests, error when connecting")
515 print("Skipping postgres tests, psycopg2 not importable")
519 class Psycopg2ConnectionPool(object):
520 dummy_table_sql = """CREATE TEMPORARY TABLE test_table
522 row_id SERIAL PRIMARY KEY,
525 value_string VARCHAR(200),
528 value_binary_string BYTEA,
532 @skip_unless(postgres_requirement)
535 self._dbmodule = psycopg2
536 self._auth = get_auth()['psycopg2']
537 super(Psycopg2ConnectionPool, self).setUp()
540 super(Psycopg2ConnectionPool, self).tearDown()
543 dbname = 'test%s' % os.getpid()
544 self._auth['database'] = dbname
549 auth = self._auth.copy()
550 auth.pop('database') # can't create if you're connecting to it
551 conn = self._dbmodule.connect(**auth)
552 conn.set_isolation_level(0)
554 db.execute("create database " + dbname)
559 auth = self._auth.copy()
560 auth.pop('database') # can't drop database we connected to
561 conn = self._dbmodule.connect(**auth)
562 conn.set_isolation_level(0)
564 db.execute("drop database " + self._auth['database'])
569 class TestPsycopg2Base(TestCase):
572 def test_cursor_works_as_context_manager(self):
573 with self.connection.cursor() as c:
574 c.execute('select 1')
579 class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
583 class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
587 if __name__ == '__main__':