1 '''Test cases for db_pool
3 from __future__ import print_function
8 from unittest import TestCase, main
10 from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth
11 from eventlet import event
12 from eventlet import db_pool
13 from eventlet.support import six
17 class DBTester(object):
18 __test__ = False # so that nose doesn't try to execute this directly
22 self.connection = None
23 connection = self._dbmodule.connect(**self._auth)
24 cursor = connection.cursor()
25 cursor.execute("""CREATE TABLE gargleblatz
35 self.connection.close()
38 def set_up_dummy_table(self, connection=None):
39 close_connection = False
40 if connection is None:
41 close_connection = True
42 if self.connection is None:
43 connection = self._dbmodule.connect(**self._auth)
45 connection = self.connection
47 cursor = connection.cursor()
48 cursor.execute(self.dummy_table_sql)
60 class DBConnectionPool(DBTester):
61 __test__ = False # so that nose doesn't try to execute this directly
64 super(DBConnectionPool, self).setUp()
65 self.pool = self.create_pool()
66 self.connection = self.pool.get()
70 self.pool.put(self.connection)
72 super(DBConnectionPool, self).tearDown()
74 def assert_cursor_works(self, cursor):
75 cursor.execute("select 1")
76 rows = cursor.fetchall()
79 def test_connecting(self):
80 assert self.connection is not None
82 def test_create_cursor(self):
83 cursor = self.connection.cursor()
86 def test_run_query(self):
87 cursor = self.connection.cursor()
88 self.assert_cursor_works(cursor)
91 def test_run_bad_query(self):
92 cursor = self.connection.cursor()
94 cursor.execute("garbage blah blah")
96 except AssertionError:
102 def test_put_none(self):
103 # the pool is of size 1, and its only connection is out
104 assert self.pool.free() == 0
106 # ha ha we fooled it into thinking that we had a dead process
107 assert self.pool.free() == 1
108 conn2 = self.pool.get()
109 assert conn2 is not None
113 def test_close_does_a_put(self):
114 assert self.pool.free() == 0
115 self.connection.close()
116 assert self.pool.free() == 1
117 self.assertRaises(AttributeError, self.connection.cursor)
120 def test_deletion_does_a_put(self):
121 # doing a put on del causes some issues if __del__ is called in the
122 # main coroutine, so, not doing that for now
123 assert self.pool.free() == 0
124 self.connection = None
125 assert self.pool.free() == 1
127 def test_put_doesnt_double_wrap(self):
128 self.pool.put(self.connection)
129 conn = self.pool.get()
130 assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
134 assert self.connection
135 self.connection.close()
136 assert not self.connection
138 def fill_up_table(self, conn):
140 for i in six.moves.range(1000):
141 curs.execute('insert into test_table (value_int) values (%s)' % i)
144 def test_returns_immediately(self):
145 self.pool = self.create_pool()
146 conn = self.pool.get()
147 self.set_up_dummy_table(conn)
148 self.fill_up_table(conn)
151 SHORT_QUERY = "select * from test_table"
155 self.assert_cursor_works(curs)
156 curs.execute(SHORT_QUERY)
159 eventlet.spawn(a_query)
161 self.assertEqual([1], results)
163 self.assertEqual([1, 2], results)
166 def test_connection_is_clean_after_put(self):
167 self.pool = self.create_pool()
168 conn = self.pool.get()
169 self.set_up_dummy_table(conn)
172 curs.execute('insert into test_table (value_int) values (%s)' % i)
176 conn2 = self.pool.get()
177 curs2 = conn2.cursor()
179 curs2.execute('insert into test_table (value_int) values (%s)' % i)
181 curs2.execute("select * from test_table")
182 # we should have only inserted them once
183 self.assertEqual(10, curs2.rowcount)
186 def test_visibility_from_other_connections(self):
187 self.pool = self.create_pool(max_size=3)
188 conn = self.pool.get()
189 conn2 = self.pool.get()
192 curs2 = conn2.cursor()
193 curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
194 self.assertEqual(curs2.rowcount, 1)
196 selection_query = "select * from gargleblatz"
197 curs2.execute(selection_query)
198 self.assertEqual(curs2.rowcount, 1)
201 # create a new connection, it should see the addition
202 conn3 = self.pool.get()
203 curs3 = conn3.cursor()
204 curs3.execute(selection_query)
205 self.assertEqual(curs3.rowcount, 1)
206 # now, does the already-open connection see it?
207 curs.execute(selection_query)
208 self.assertEqual(curs.rowcount, 1)
212 curs.execute("delete from gargleblatz where a=314159")
217 def test_two_simultaneous_connections(self):
218 # timing-sensitive test, disabled until we come up with a better
220 self.pool = self.create_pool(max_size=2)
221 conn = self.pool.get()
222 self.set_up_dummy_table(conn)
223 self.fill_up_table(conn)
225 conn2 = self.pool.get()
226 self.set_up_dummy_table(conn2)
227 self.fill_up_table(conn2)
228 curs2 = conn2.cursor()
230 LONG_QUERY = "select * from test_table"
231 SHORT_QUERY = "select * from test_table where row_id <= 20"
235 def long_running_query():
236 self.assert_cursor_works(curs)
237 curs.execute(LONG_QUERY)
242 def short_running_query():
243 self.assert_cursor_works(curs2)
244 curs2.execute(SHORT_QUERY)
248 eventlet.spawn(long_running_query)
249 eventlet.spawn(short_running_query)
253 self.assertEqual([1, 2], results)
255 def test_clear(self):
256 self.pool = self.create_pool()
257 self.pool.put(self.connection)
259 self.assertEqual(len(self.pool.free_items), 0)
261 def test_clear_warmup(self):
262 """Clear implicitly created connections (min_size > 0)"""
263 self.pool = self.create_pool(min_size=1)
265 self.assertEqual(len(self.pool.free_items), 0)
267 def test_unwrap_connection(self):
268 self.assert_(isinstance(self.connection,
269 db_pool.GenericConnectionWrapper))
270 conn = self.pool._unwrap_connection(self.connection)
271 assert not isinstance(conn, db_pool.GenericConnectionWrapper)
273 self.assertEqual(None, self.pool._unwrap_connection(None))
274 self.assertEqual(None, self.pool._unwrap_connection(1))
276 # testing duck typing here -- as long as the connection has a
277 # _base attribute, it should be unwrappable
280 self.assertEqual('hi', self.pool._unwrap_connection(x))
283 def test_safe_close(self):
284 self.pool._safe_close(self.connection, quiet=True)
285 self.assertEqual(len(self.pool.free_items), 1)
287 self.pool._safe_close(None)
288 self.pool._safe_close(1)
290 # now we're really going for 100% coverage
294 raise KeyboardInterrupt()
296 self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
301 raise RuntimeError("if this line has been printed, the test succeeded")
303 self.pool._safe_close(x, quiet=False)
305 def test_zero_max_idle(self):
306 self.pool.put(self.connection)
308 self.pool = self.create_pool(max_size=2, max_idle=0)
309 self.connection = self.pool.get()
310 self.connection.close()
311 self.assertEqual(len(self.pool.free_items), 0)
313 def test_zero_max_age(self):
314 self.pool.put(self.connection)
316 self.pool = self.create_pool(max_size=2, max_age=0)
317 self.connection = self.pool.get()
318 self.connection.close()
319 self.assertEqual(len(self.pool.free_items), 0)
322 def test_max_idle(self):
323 # This test is timing-sensitive. Rename the function without
324 # the "dont" to run it, but beware that it could fail or take
327 self.pool = self.create_pool(max_size=2, max_idle=0.02)
328 self.connection = self.pool.get()
329 self.connection.close()
330 self.assertEqual(len(self.pool.free_items), 1)
331 eventlet.sleep(0.01) # not long enough to trigger the idle timeout
332 self.assertEqual(len(self.pool.free_items), 1)
333 self.connection = self.pool.get()
334 self.connection.close()
335 self.assertEqual(len(self.pool.free_items), 1)
336 eventlet.sleep(0.01) # idle timeout should have fired but done nothing
337 self.assertEqual(len(self.pool.free_items), 1)
338 self.connection = self.pool.get()
339 self.connection.close()
340 self.assertEqual(len(self.pool.free_items), 1)
341 eventlet.sleep(0.03) # long enough to trigger idle timeout for real
342 self.assertEqual(len(self.pool.free_items), 0)
345 def test_max_idle_many(self):
346 # This test is timing-sensitive. Rename the function without
347 # the "dont" to run it, but beware that it could fail or take
350 self.pool = self.create_pool(max_size=2, max_idle=0.02)
351 self.connection, conn2 = self.pool.get(), self.pool.get()
352 self.connection.close()
354 self.assertEqual(len(self.pool.free_items), 1)
356 self.assertEqual(len(self.pool.free_items), 2)
357 eventlet.sleep(0.02) # trigger cleanup of conn1 but not conn2
358 self.assertEqual(len(self.pool.free_items), 1)
361 def test_max_age(self):
362 # This test is timing-sensitive. Rename the function without
363 # the "dont" to run it, but beware that it could fail or take
366 self.pool = self.create_pool(max_size=2, max_age=0.05)
367 self.connection = self.pool.get()
368 self.connection.close()
369 self.assertEqual(len(self.pool.free_items), 1)
370 eventlet.sleep(0.01) # not long enough to trigger the age timeout
371 self.assertEqual(len(self.pool.free_items), 1)
372 self.connection = self.pool.get()
373 self.connection.close()
374 self.assertEqual(len(self.pool.free_items), 1)
375 eventlet.sleep(0.05) # long enough to trigger age timeout
376 self.assertEqual(len(self.pool.free_items), 0)
379 def test_max_age_many(self):
380 # This test is timing-sensitive. Rename the function without
381 # the "dont" to run it, but beware that it could fail or take
384 self.pool = self.create_pool(max_size=2, max_age=0.15)
385 self.connection, conn2 = self.pool.get(), self.pool.get()
386 self.connection.close()
387 self.assertEqual(len(self.pool.free_items), 1)
388 eventlet.sleep(0) # not long enough to trigger the age timeout
389 self.assertEqual(len(self.pool.free_items), 1)
390 eventlet.sleep(0.2) # long enough to trigger age timeout
391 self.assertEqual(len(self.pool.free_items), 0)
392 conn2.close() # should not be added to the free items
393 self.assertEqual(len(self.pool.free_items), 0)
395 def test_waiters_get_woken(self):
396 # verify that when there's someone waiting on an empty pool
397 # and someone puts an immediately-closed connection back in
398 # the pool that the waiter gets woken
399 self.pool.put(self.connection)
401 self.pool = self.create_pool(max_size=1, max_age=0)
403 self.connection = self.pool.get()
404 self.assertEqual(self.pool.free(), 0)
405 self.assertEqual(self.pool.waiting(), 0)
408 def retrieve(pool, ev):
411 eventlet.spawn(retrieve, self.pool, e)
412 eventlet.sleep(0) # these two sleeps should advance the retrieve
413 eventlet.sleep(0) # coroutine until it's waiting in get()
414 self.assertEqual(self.pool.free(), 0)
415 self.assertEqual(self.pool.waiting(), 1)
416 self.pool.put(self.connection)
417 timer = eventlet.Timeout(1)
420 self.assertEqual(self.pool.free(), 0)
421 self.assertEqual(self.pool.waiting(), 0)
425 def test_0_straight_benchmark(self):
426 """ Benchmark; don't run unless you want to wait a while."""
429 c = self.connection.cursor()
430 self.connection.commit()
433 for i in six.moves.range(iterations):
434 c.execute('select 1')
442 results.append(end - start)
444 print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
445 iterations, sum(results) / len(results), results, type(self)))
447 def test_raising_create(self):
448 # if the create() method raises an exception the pool should
449 # not lose any connections
450 self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
451 self.assertRaises(RuntimeError, self.pool.get)
452 self.assertEqual(self.pool.free(), 1)
455 class DummyConnection(object):
459 class DummyDBModule(object):
460 def connect(self, *args, **kwargs):
461 return DummyConnection()
464 class RaisingDBModule(object):
465 def connect(self, *args, **kw):
469 class TpoolConnectionPool(DBConnectionPool):
470 __test__ = False # so that nose doesn't try to execute this directly
472 def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
473 connect_timeout=0.5, module=None):
475 module = self._dbmodule
476 return db_pool.TpooledConnectionPool(
478 min_size=min_size, max_size=max_size,
479 max_idle=max_idle, max_age=max_age,
480 connect_timeout=connect_timeout,
485 super(TpoolConnectionPool, self).setUp()
488 super(TpoolConnectionPool, self).tearDown()
489 from eventlet import tpool
493 class RawConnectionPool(DBConnectionPool):
494 __test__ = False # so that nose doesn't try to execute this directly
496 def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
497 connect_timeout=0.5, module=None):
499 module = self._dbmodule
500 return db_pool.RawConnectionPool(
502 min_size=min_size, max_size=max_size,
503 max_idle=max_idle, max_age=max_age,
504 connect_timeout=connect_timeout,
508 class TestRawConnectionPool(TestCase):
509 def test_issue_125(self):
510 # pool = self.create_pool(min_size=3, max_size=5)
511 pool = db_pool.RawConnectionPool(
513 dsn="dbname=test user=jessica port=5433",
514 min_size=3, max_size=5)
518 def test_custom_cleanup_ok(self):
519 cleanup_mock = mock.Mock()
520 pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
523 assert cleanup_mock.call_count == 1
525 with pool.item() as conn:
527 assert cleanup_mock.call_count == 2
529 def test_custom_cleanup_arg_error(self):
530 cleanup_mock = mock.Mock(side_effect=NotImplementedError)
531 pool = db_pool.RawConnectionPool(DummyDBModule())
533 pool.put(conn, cleanup=cleanup_mock)
534 assert cleanup_mock.call_count == 1
536 with pool.item(cleanup=cleanup_mock):
538 assert cleanup_mock.call_count == 2
540 def test_custom_cleanup_fatal(self):
545 raise KeyboardInterrupt
547 pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
551 except KeyboardInterrupt:
554 assert False, 'Expected KeyboardInterrupt'
558 get_auth = get_database_auth
561 def mysql_requirement(_f):
562 verbose = os.environ.get('eventlet_test_mysql_verbose')
566 auth = get_auth()['MySQLdb'].copy()
567 MySQLdb.connect(**auth)
569 except MySQLdb.OperationalError:
571 print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
572 traceback.print_exc()
576 print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
580 class MysqlConnectionPool(object):
581 dummy_table_sql = """CREATE TEMPORARY TABLE test_table
583 row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
586 value_string VARCHAR(200),
589 value_binary_string VARCHAR(200) BINARY,
590 value_enum ENUM('Y','N'),
594 @skip_unless(mysql_requirement)
597 self._dbmodule = MySQLdb
598 self._auth = get_auth()['MySQLdb']
599 super(MysqlConnectionPool, self).setUp()
602 super(MysqlConnectionPool, self).tearDown()
605 auth = self._auth.copy()
610 dbname = 'test%s' % os.getpid()
611 db = self._dbmodule.connect(**auth).cursor()
612 db.execute("create database " + dbname)
614 self._auth['db'] = dbname
618 db = self._dbmodule.connect(**self._auth).cursor()
619 db.execute("drop database " + self._auth['db'])
624 class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
628 class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
632 def postgres_requirement(_f):
636 auth = get_auth()['psycopg2'].copy()
637 psycopg2.connect(**auth)
639 except psycopg2.OperationalError:
640 print("Skipping postgres tests, error when connecting")
643 print("Skipping postgres tests, psycopg2 not importable")
647 class Psycopg2ConnectionPool(object):
648 dummy_table_sql = """CREATE TEMPORARY TABLE test_table
650 row_id SERIAL PRIMARY KEY,
653 value_string VARCHAR(200),
656 value_binary_string BYTEA,
660 @skip_unless(postgres_requirement)
663 self._dbmodule = psycopg2
664 self._auth = get_auth()['psycopg2']
665 super(Psycopg2ConnectionPool, self).setUp()
668 super(Psycopg2ConnectionPool, self).tearDown()
671 dbname = 'test%s' % os.getpid()
672 self._auth['database'] = dbname
677 auth = self._auth.copy()
678 auth.pop('database') # can't create if you're connecting to it
679 conn = self._dbmodule.connect(**auth)
680 conn.set_isolation_level(0)
682 db.execute("create database " + dbname)
687 auth = self._auth.copy()
688 auth.pop('database') # can't drop database we connected to
689 conn = self._dbmodule.connect(**auth)
690 conn.set_isolation_level(0)
692 db.execute("drop database " + self._auth['database'])
697 class TestPsycopg2Base(TestCase):
700 def test_cursor_works_as_context_manager(self):
701 with self.connection.cursor() as c:
702 c.execute('select 1')
707 class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
711 class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
715 if __name__ == '__main__':