Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / tests / db_pool_test.py
1 '''Test cases for db_pool
2 '''
3 from __future__ import print_function
4
5 import sys
6 import os
7 import traceback
8 from unittest import TestCase, main
9
10 from tests import mock, skip_unless, skip_with_pyevent, get_database_auth
11 from eventlet import event
12 from eventlet import db_pool
13 from eventlet.support import six
14 import eventlet
15
16
17 class DBTester(object):
18     __test__ = False  # so that nose doesn't try to execute this directly
19
20     def setUp(self):
21         self.create_db()
22         self.connection = None
23         connection = self._dbmodule.connect(**self._auth)
24         cursor = connection.cursor()
25         cursor.execute("""CREATE  TABLE gargleblatz
26         (
27         a INTEGER
28         );""")
29         connection.commit()
30         cursor.close()
31         connection.close()
32
33     def tearDown(self):
34         if self.connection:
35             self.connection.close()
36         self.drop_db()
37
38     def set_up_dummy_table(self, connection=None):
39         close_connection = False
40         if connection is None:
41             close_connection = True
42             if self.connection is None:
43                 connection = self._dbmodule.connect(**self._auth)
44             else:
45                 connection = self.connection
46
47         cursor = connection.cursor()
48         cursor.execute(self.dummy_table_sql)
49         connection.commit()
50         cursor.close()
51         if close_connection:
52             connection.close()
53
54
55 # silly mock class
56 class Mock(object):
57     pass
58
59
60 class DBConnectionPool(DBTester):
61     __test__ = False  # so that nose doesn't try to execute this directly
62
63     def setUp(self):
64         super(DBConnectionPool, self).setUp()
65         self.pool = self.create_pool()
66         self.connection = self.pool.get()
67
68     def tearDown(self):
69         if self.connection:
70             self.pool.put(self.connection)
71         self.pool.clear()
72         super(DBConnectionPool, self).tearDown()
73
74     def assert_cursor_works(self, cursor):
75         cursor.execute("select 1")
76         rows = cursor.fetchall()
77         assert rows
78
79     def test_connecting(self):
80         assert self.connection is not None
81
82     def test_create_cursor(self):
83         cursor = self.connection.cursor()
84         cursor.close()
85
86     def test_run_query(self):
87         cursor = self.connection.cursor()
88         self.assert_cursor_works(cursor)
89         cursor.close()
90
91     def test_run_bad_query(self):
92         cursor = self.connection.cursor()
93         try:
94             cursor.execute("garbage blah blah")
95             assert False
96         except AssertionError:
97             raise
98         except Exception:
99             pass
100         cursor.close()
101
102     def test_put_none(self):
103         # the pool is of size 1, and its only connection is out
104         assert self.pool.free() == 0
105         self.pool.put(None)
106         # ha ha we fooled it into thinking that we had a dead process
107         assert self.pool.free() == 1
108         conn2 = self.pool.get()
109         assert conn2 is not None
110         assert conn2.cursor
111         self.pool.put(conn2)
112
113     def test_close_does_a_put(self):
114         assert self.pool.free() == 0
115         self.connection.close()
116         assert self.pool.free() == 1
117         self.assertRaises(AttributeError, self.connection.cursor)
118
119     def test_put_doesnt_double_wrap(self):
120         self.pool.put(self.connection)
121         conn = self.pool.get()
122         assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
123         self.pool.put(conn)
124
125     def test_bool(self):
126         assert self.connection
127         self.connection.close()
128         assert not self.connection
129
130     def fill_up_table(self, conn):
131         curs = conn.cursor()
132         for i in six.moves.range(1000):
133             curs.execute('insert into test_table (value_int) values (%s)' % i)
134         conn.commit()
135
136     def test_returns_immediately(self):
137         self.pool = self.create_pool()
138         conn = self.pool.get()
139         self.set_up_dummy_table(conn)
140         self.fill_up_table(conn)
141         curs = conn.cursor()
142         results = []
143         SHORT_QUERY = "select * from test_table"
144         evt = event.Event()
145
146         def a_query():
147             self.assert_cursor_works(curs)
148             curs.execute(SHORT_QUERY)
149             results.append(2)
150             evt.send()
151         eventlet.spawn(a_query)
152         results.append(1)
153         self.assertEqual([1], results)
154         evt.wait()
155         self.assertEqual([1, 2], results)
156         self.pool.put(conn)
157
158     def test_connection_is_clean_after_put(self):
159         self.pool = self.create_pool()
160         conn = self.pool.get()
161         self.set_up_dummy_table(conn)
162         curs = conn.cursor()
163         for i in range(10):
164             curs.execute('insert into test_table (value_int) values (%s)' % i)
165         # do not commit  :-)
166         self.pool.put(conn)
167         del conn
168         conn2 = self.pool.get()
169         curs2 = conn2.cursor()
170         for i in range(10):
171             curs2.execute('insert into test_table (value_int) values (%s)' % i)
172         conn2.commit()
173         curs2.execute("select * from test_table")
174         # we should have only inserted them once
175         self.assertEqual(10, curs2.rowcount)
176         self.pool.put(conn2)
177
178     def test_visibility_from_other_connections(self):
179         self.pool = self.create_pool(max_size=3)
180         conn = self.pool.get()
181         conn2 = self.pool.get()
182         curs = conn.cursor()
183         try:
184             curs2 = conn2.cursor()
185             curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
186             self.assertEqual(curs2.rowcount, 1)
187             conn2.commit()
188             selection_query = "select * from gargleblatz"
189             curs2.execute(selection_query)
190             self.assertEqual(curs2.rowcount, 1)
191             del curs2
192             self.pool.put(conn2)
193             # create a new connection, it should see the addition
194             conn3 = self.pool.get()
195             curs3 = conn3.cursor()
196             curs3.execute(selection_query)
197             self.assertEqual(curs3.rowcount, 1)
198             # now, does the already-open connection see it?
199             curs.execute(selection_query)
200             self.assertEqual(curs.rowcount, 1)
201             self.pool.put(conn3)
202         finally:
203             # clean up my litter
204             curs.execute("delete from gargleblatz where a=314159")
205             conn.commit()
206             self.pool.put(conn)
207
208     def test_clear(self):
209         self.pool = self.create_pool()
210         self.pool.put(self.connection)
211         self.pool.clear()
212         self.assertEqual(len(self.pool.free_items), 0)
213
214     def test_clear_warmup(self):
215         """Clear implicitly created connections (min_size > 0)"""
216         self.pool = self.create_pool(min_size=1)
217         self.pool.clear()
218         self.assertEqual(len(self.pool.free_items), 0)
219
220     def test_unwrap_connection(self):
221         self.assert_(isinstance(self.connection,
222                                 db_pool.GenericConnectionWrapper))
223         conn = self.pool._unwrap_connection(self.connection)
224         assert not isinstance(conn, db_pool.GenericConnectionWrapper)
225
226         self.assertEqual(None, self.pool._unwrap_connection(None))
227         self.assertEqual(None, self.pool._unwrap_connection(1))
228
229         # testing duck typing here -- as long as the connection has a
230         # _base attribute, it should be unwrappable
231         x = Mock()
232         x._base = 'hi'
233         self.assertEqual('hi', self.pool._unwrap_connection(x))
234         conn.close()
235
236     def test_safe_close(self):
237         self.pool._safe_close(self.connection, quiet=True)
238         self.assertEqual(len(self.pool.free_items), 1)
239
240         self.pool._safe_close(None)
241         self.pool._safe_close(1)
242
243         # now we're really going for 100% coverage
244         x = Mock()
245
246         def fail():
247             raise KeyboardInterrupt()
248         x.close = fail
249         self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
250
251         x = Mock()
252
253         def fail2():
254             raise RuntimeError("if this line has been printed, the test succeeded")
255         x.close = fail2
256         self.pool._safe_close(x, quiet=False)
257
258     def test_zero_max_idle(self):
259         self.pool.put(self.connection)
260         self.pool.clear()
261         self.pool = self.create_pool(max_size=2, max_idle=0)
262         self.connection = self.pool.get()
263         self.connection.close()
264         self.assertEqual(len(self.pool.free_items), 0)
265
266     def test_zero_max_age(self):
267         self.pool.put(self.connection)
268         self.pool.clear()
269         self.pool = self.create_pool(max_size=2, max_age=0)
270         self.connection = self.pool.get()
271         self.connection.close()
272         self.assertEqual(len(self.pool.free_items), 0)
273
274     def test_waiters_get_woken(self):
275         # verify that when there's someone waiting on an empty pool
276         # and someone puts an immediately-closed connection back in
277         # the pool that the waiter gets woken
278         self.pool.put(self.connection)
279         self.pool.clear()
280         self.pool = self.create_pool(max_size=1, max_age=0)
281
282         self.connection = self.pool.get()
283         self.assertEqual(self.pool.free(), 0)
284         self.assertEqual(self.pool.waiting(), 0)
285         e = event.Event()
286
287         def retrieve(pool, ev):
288             c = pool.get()
289             ev.send(c)
290         eventlet.spawn(retrieve, self.pool, e)
291         eventlet.sleep(0)  # these two sleeps should advance the retrieve
292         eventlet.sleep(0)  # coroutine until it's waiting in get()
293         self.assertEqual(self.pool.free(), 0)
294         self.assertEqual(self.pool.waiting(), 1)
295         self.pool.put(self.connection)
296         timer = eventlet.Timeout(1)
297         conn = e.wait()
298         timer.cancel()
299         self.assertEqual(self.pool.free(), 0)
300         self.assertEqual(self.pool.waiting(), 0)
301         self.pool.put(conn)
302
303     def test_raising_create(self):
304         # if the create() method raises an exception the pool should
305         # not lose any connections
306         self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
307         self.assertRaises(RuntimeError, self.pool.get)
308         self.assertEqual(self.pool.free(), 1)
309
310
311 class DummyConnection(object):
312     def rollback(self):
313         pass
314
315
316 class DummyDBModule(object):
317     def connect(self, *args, **kwargs):
318         return DummyConnection()
319
320
321 class RaisingDBModule(object):
322     def connect(self, *args, **kw):
323         raise RuntimeError()
324
325
326 class TpoolConnectionPool(DBConnectionPool):
327     __test__ = False  # so that nose doesn't try to execute this directly
328
329     def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
330                     connect_timeout=0.5, module=None):
331         if module is None:
332             module = self._dbmodule
333         return db_pool.TpooledConnectionPool(
334             module,
335             min_size=min_size, max_size=max_size,
336             max_idle=max_idle, max_age=max_age,
337             connect_timeout=connect_timeout,
338             **self._auth)
339
340     @skip_with_pyevent
341     def setUp(self):
342         super(TpoolConnectionPool, self).setUp()
343
344     def tearDown(self):
345         super(TpoolConnectionPool, self).tearDown()
346         from eventlet import tpool
347         tpool.killall()
348
349
350 class RawConnectionPool(DBConnectionPool):
351     __test__ = False  # so that nose doesn't try to execute this directly
352
353     def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
354                     connect_timeout=0.5, module=None):
355         if module is None:
356             module = self._dbmodule
357         return db_pool.RawConnectionPool(
358             module,
359             min_size=min_size, max_size=max_size,
360             max_idle=max_idle, max_age=max_age,
361             connect_timeout=connect_timeout,
362             **self._auth)
363
364
365 def test_raw_pool_issue_125():
366     # pool = self.create_pool(min_size=3, max_size=5)
367     pool = db_pool.RawConnectionPool(
368         DummyDBModule(),
369         dsn="dbname=test user=jessica port=5433",
370         min_size=3, max_size=5)
371     conn = pool.get()
372     pool.put(conn)
373
374
375 def test_raw_pool_custom_cleanup_ok():
376     cleanup_mock = mock.Mock()
377     pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
378     conn = pool.get()
379     pool.put(conn)
380     assert cleanup_mock.call_count == 1
381
382     with pool.item() as conn:
383         pass
384     assert cleanup_mock.call_count == 2
385
386
387 def test_raw_pool_custom_cleanup_arg_error():
388     cleanup_mock = mock.Mock(side_effect=NotImplementedError)
389     pool = db_pool.RawConnectionPool(DummyDBModule())
390     conn = pool.get()
391     pool.put(conn, cleanup=cleanup_mock)
392     assert cleanup_mock.call_count == 1
393
394     with pool.item(cleanup=cleanup_mock):
395         pass
396     assert cleanup_mock.call_count == 2
397
398
399 def test_raw_pool_custom_cleanup_fatal():
400     state = [0]
401
402     def cleanup(conn):
403         state[0] += 1
404         raise KeyboardInterrupt
405
406     pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
407     conn = pool.get()
408     try:
409         pool.put(conn)
410     except KeyboardInterrupt:
411         pass
412     else:
413         assert False, 'Expected KeyboardInterrupt'
414     assert state[0] == 1
415
416
417 def test_raw_pool_clear_update_current_size():
418     # https://github.com/eventlet/eventlet/issues/139
419     # BaseConnectionPool.clear does not update .current_size.
420     # That leads to situation when new connections could not be created.
421     pool = db_pool.RawConnectionPool(DummyDBModule())
422     pool.get().close()
423     assert pool.current_size == 1
424     assert len(pool.free_items) == 1
425     pool.clear()
426     assert pool.current_size == 0
427     assert len(pool.free_items) == 0
428
429
430 get_auth = get_database_auth
431
432
433 def mysql_requirement(_f):
434     verbose = os.environ.get('eventlet_test_mysql_verbose')
435     try:
436         import MySQLdb
437         try:
438             auth = get_auth()['MySQLdb'].copy()
439             MySQLdb.connect(**auth)
440             return True
441         except MySQLdb.OperationalError:
442             if verbose:
443                 print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
444                 traceback.print_exc()
445             return False
446     except ImportError:
447         if verbose:
448             print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
449         return False
450
451
452 class MysqlConnectionPool(object):
453     dummy_table_sql = """CREATE TEMPORARY TABLE test_table
454         (
455         row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
456         value_int INTEGER,
457         value_float FLOAT,
458         value_string VARCHAR(200),
459         value_uuid CHAR(36),
460         value_binary BLOB,
461         value_binary_string VARCHAR(200) BINARY,
462         value_enum ENUM('Y','N'),
463         created TIMESTAMP
464         ) ENGINE=InnoDB;"""
465
466     @skip_unless(mysql_requirement)
467     def setUp(self):
468         import MySQLdb
469         self._dbmodule = MySQLdb
470         self._auth = get_auth()['MySQLdb']
471         super(MysqlConnectionPool, self).setUp()
472
473     def tearDown(self):
474         super(MysqlConnectionPool, self).tearDown()
475
476     def create_db(self):
477         auth = self._auth.copy()
478         try:
479             self.drop_db()
480         except Exception:
481             pass
482         dbname = 'test%s' % os.getpid()
483         db = self._dbmodule.connect(**auth).cursor()
484         db.execute("create database " + dbname)
485         db.close()
486         self._auth['db'] = dbname
487         del db
488
489     def drop_db(self):
490         db = self._dbmodule.connect(**self._auth).cursor()
491         db.execute("drop database " + self._auth['db'])
492         db.close()
493         del db
494
495
496 class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
497     __test__ = True
498
499
500 class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
501     __test__ = True
502
503
504 def postgres_requirement(_f):
505     try:
506         import psycopg2
507         try:
508             auth = get_auth()['psycopg2'].copy()
509             psycopg2.connect(**auth)
510             return True
511         except psycopg2.OperationalError:
512             print("Skipping postgres tests, error when connecting")
513             return False
514     except ImportError:
515         print("Skipping postgres tests, psycopg2 not importable")
516         return False
517
518
519 class Psycopg2ConnectionPool(object):
520     dummy_table_sql = """CREATE TEMPORARY TABLE test_table
521         (
522         row_id SERIAL PRIMARY KEY,
523         value_int INTEGER,
524         value_float FLOAT,
525         value_string VARCHAR(200),
526         value_uuid CHAR(36),
527         value_binary BYTEA,
528         value_binary_string BYTEA,
529         created TIMESTAMP
530         );"""
531
532     @skip_unless(postgres_requirement)
533     def setUp(self):
534         import psycopg2
535         self._dbmodule = psycopg2
536         self._auth = get_auth()['psycopg2']
537         super(Psycopg2ConnectionPool, self).setUp()
538
539     def tearDown(self):
540         super(Psycopg2ConnectionPool, self).tearDown()
541
542     def create_db(self):
543         dbname = 'test%s' % os.getpid()
544         self._auth['database'] = dbname
545         try:
546             self.drop_db()
547         except Exception:
548             pass
549         auth = self._auth.copy()
550         auth.pop('database')  # can't create if you're connecting to it
551         conn = self._dbmodule.connect(**auth)
552         conn.set_isolation_level(0)
553         db = conn.cursor()
554         db.execute("create database " + dbname)
555         db.close()
556         conn.close()
557
558     def drop_db(self):
559         auth = self._auth.copy()
560         auth.pop('database')  # can't drop database we connected to
561         conn = self._dbmodule.connect(**auth)
562         conn.set_isolation_level(0)
563         db = conn.cursor()
564         db.execute("drop database " + self._auth['database'])
565         db.close()
566         conn.close()
567
568
569 class TestPsycopg2Base(TestCase):
570     __test__ = False
571
572     def test_cursor_works_as_context_manager(self):
573         with self.connection.cursor() as c:
574             c.execute('select 1')
575             row = c.fetchone()
576             assert row == (1,)
577
578
579 class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
580     __test__ = True
581
582
583 class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
584     __test__ = True
585
586
587 if __name__ == '__main__':
588     main()