Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / tests / db_pool_test.py
1 '''Test cases for db_pool
2 '''
3 from __future__ import print_function
4
5 import sys
6 import os
7 import traceback
8 from unittest import TestCase, main
9
10 from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth
11 from eventlet import event
12 from eventlet import db_pool
13 from eventlet.support import six
14 import eventlet
15
16
17 class DBTester(object):
18     __test__ = False  # so that nose doesn't try to execute this directly
19
20     def setUp(self):
21         self.create_db()
22         self.connection = None
23         connection = self._dbmodule.connect(**self._auth)
24         cursor = connection.cursor()
25         cursor.execute("""CREATE  TABLE gargleblatz
26         (
27         a INTEGER
28         );""")
29         connection.commit()
30         cursor.close()
31         connection.close()
32
33     def tearDown(self):
34         if self.connection:
35             self.connection.close()
36         self.drop_db()
37
38     def set_up_dummy_table(self, connection=None):
39         close_connection = False
40         if connection is None:
41             close_connection = True
42             if self.connection is None:
43                 connection = self._dbmodule.connect(**self._auth)
44             else:
45                 connection = self.connection
46
47         cursor = connection.cursor()
48         cursor.execute(self.dummy_table_sql)
49         connection.commit()
50         cursor.close()
51         if close_connection:
52             connection.close()
53
54
55 # silly mock class
56 class Mock(object):
57     pass
58
59
60 class DBConnectionPool(DBTester):
61     __test__ = False  # so that nose doesn't try to execute this directly
62
63     def setUp(self):
64         super(DBConnectionPool, self).setUp()
65         self.pool = self.create_pool()
66         self.connection = self.pool.get()
67
68     def tearDown(self):
69         if self.connection:
70             self.pool.put(self.connection)
71         self.pool.clear()
72         super(DBConnectionPool, self).tearDown()
73
74     def assert_cursor_works(self, cursor):
75         cursor.execute("select 1")
76         rows = cursor.fetchall()
77         assert rows
78
79     def test_connecting(self):
80         assert self.connection is not None
81
82     def test_create_cursor(self):
83         cursor = self.connection.cursor()
84         cursor.close()
85
86     def test_run_query(self):
87         cursor = self.connection.cursor()
88         self.assert_cursor_works(cursor)
89         cursor.close()
90
91     def test_run_bad_query(self):
92         cursor = self.connection.cursor()
93         try:
94             cursor.execute("garbage blah blah")
95             assert False
96         except AssertionError:
97             raise
98         except Exception:
99             pass
100         cursor.close()
101
102     def test_put_none(self):
103         # the pool is of size 1, and its only connection is out
104         assert self.pool.free() == 0
105         self.pool.put(None)
106         # ha ha we fooled it into thinking that we had a dead process
107         assert self.pool.free() == 1
108         conn2 = self.pool.get()
109         assert conn2 is not None
110         assert conn2.cursor
111         self.pool.put(conn2)
112
113     def test_close_does_a_put(self):
114         assert self.pool.free() == 0
115         self.connection.close()
116         assert self.pool.free() == 1
117         self.assertRaises(AttributeError, self.connection.cursor)
118
119     @skipped
120     def test_deletion_does_a_put(self):
121         # doing a put on del causes some issues if __del__ is called in the
122         # main coroutine, so, not doing that for now
123         assert self.pool.free() == 0
124         self.connection = None
125         assert self.pool.free() == 1
126
127     def test_put_doesnt_double_wrap(self):
128         self.pool.put(self.connection)
129         conn = self.pool.get()
130         assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
131         self.pool.put(conn)
132
133     def test_bool(self):
134         assert self.connection
135         self.connection.close()
136         assert not self.connection
137
138     def fill_up_table(self, conn):
139         curs = conn.cursor()
140         for i in six.moves.range(1000):
141             curs.execute('insert into test_table (value_int) values (%s)' % i)
142         conn.commit()
143
144     def test_returns_immediately(self):
145         self.pool = self.create_pool()
146         conn = self.pool.get()
147         self.set_up_dummy_table(conn)
148         self.fill_up_table(conn)
149         curs = conn.cursor()
150         results = []
151         SHORT_QUERY = "select * from test_table"
152         evt = event.Event()
153
154         def a_query():
155             self.assert_cursor_works(curs)
156             curs.execute(SHORT_QUERY)
157             results.append(2)
158             evt.send()
159         eventlet.spawn(a_query)
160         results.append(1)
161         self.assertEqual([1], results)
162         evt.wait()
163         self.assertEqual([1, 2], results)
164         self.pool.put(conn)
165
166     def test_connection_is_clean_after_put(self):
167         self.pool = self.create_pool()
168         conn = self.pool.get()
169         self.set_up_dummy_table(conn)
170         curs = conn.cursor()
171         for i in range(10):
172             curs.execute('insert into test_table (value_int) values (%s)' % i)
173         # do not commit  :-)
174         self.pool.put(conn)
175         del conn
176         conn2 = self.pool.get()
177         curs2 = conn2.cursor()
178         for i in range(10):
179             curs2.execute('insert into test_table (value_int) values (%s)' % i)
180         conn2.commit()
181         curs2.execute("select * from test_table")
182         # we should have only inserted them once
183         self.assertEqual(10, curs2.rowcount)
184         self.pool.put(conn2)
185
186     def test_visibility_from_other_connections(self):
187         self.pool = self.create_pool(max_size=3)
188         conn = self.pool.get()
189         conn2 = self.pool.get()
190         curs = conn.cursor()
191         try:
192             curs2 = conn2.cursor()
193             curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
194             self.assertEqual(curs2.rowcount, 1)
195             conn2.commit()
196             selection_query = "select * from gargleblatz"
197             curs2.execute(selection_query)
198             self.assertEqual(curs2.rowcount, 1)
199             del curs2
200             self.pool.put(conn2)
201             # create a new connection, it should see the addition
202             conn3 = self.pool.get()
203             curs3 = conn3.cursor()
204             curs3.execute(selection_query)
205             self.assertEqual(curs3.rowcount, 1)
206             # now, does the already-open connection see it?
207             curs.execute(selection_query)
208             self.assertEqual(curs.rowcount, 1)
209             self.pool.put(conn3)
210         finally:
211             # clean up my litter
212             curs.execute("delete from gargleblatz where a=314159")
213             conn.commit()
214             self.pool.put(conn)
215
216     @skipped
217     def test_two_simultaneous_connections(self):
218         # timing-sensitive test, disabled until we come up with a better
219         # way to do this
220         self.pool = self.create_pool(max_size=2)
221         conn = self.pool.get()
222         self.set_up_dummy_table(conn)
223         self.fill_up_table(conn)
224         curs = conn.cursor()
225         conn2 = self.pool.get()
226         self.set_up_dummy_table(conn2)
227         self.fill_up_table(conn2)
228         curs2 = conn2.cursor()
229         results = []
230         LONG_QUERY = "select * from test_table"
231         SHORT_QUERY = "select * from test_table where row_id <= 20"
232
233         evt = event.Event()
234
235         def long_running_query():
236             self.assert_cursor_works(curs)
237             curs.execute(LONG_QUERY)
238             results.append(1)
239             evt.send()
240         evt2 = event.Event()
241
242         def short_running_query():
243             self.assert_cursor_works(curs2)
244             curs2.execute(SHORT_QUERY)
245             results.append(2)
246             evt2.send()
247
248         eventlet.spawn(long_running_query)
249         eventlet.spawn(short_running_query)
250         evt.wait()
251         evt2.wait()
252         results.sort()
253         self.assertEqual([1, 2], results)
254
255     def test_clear(self):
256         self.pool = self.create_pool()
257         self.pool.put(self.connection)
258         self.pool.clear()
259         self.assertEqual(len(self.pool.free_items), 0)
260
261     def test_clear_warmup(self):
262         """Clear implicitly created connections (min_size > 0)"""
263         self.pool = self.create_pool(min_size=1)
264         self.pool.clear()
265         self.assertEqual(len(self.pool.free_items), 0)
266
267     def test_unwrap_connection(self):
268         self.assert_(isinstance(self.connection,
269                                 db_pool.GenericConnectionWrapper))
270         conn = self.pool._unwrap_connection(self.connection)
271         assert not isinstance(conn, db_pool.GenericConnectionWrapper)
272
273         self.assertEqual(None, self.pool._unwrap_connection(None))
274         self.assertEqual(None, self.pool._unwrap_connection(1))
275
276         # testing duck typing here -- as long as the connection has a
277         # _base attribute, it should be unwrappable
278         x = Mock()
279         x._base = 'hi'
280         self.assertEqual('hi', self.pool._unwrap_connection(x))
281         conn.close()
282
283     def test_safe_close(self):
284         self.pool._safe_close(self.connection, quiet=True)
285         self.assertEqual(len(self.pool.free_items), 1)
286
287         self.pool._safe_close(None)
288         self.pool._safe_close(1)
289
290         # now we're really going for 100% coverage
291         x = Mock()
292
293         def fail():
294             raise KeyboardInterrupt()
295         x.close = fail
296         self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
297
298         x = Mock()
299
300         def fail2():
301             raise RuntimeError("if this line has been printed, the test succeeded")
302         x.close = fail2
303         self.pool._safe_close(x, quiet=False)
304
305     def test_zero_max_idle(self):
306         self.pool.put(self.connection)
307         self.pool.clear()
308         self.pool = self.create_pool(max_size=2, max_idle=0)
309         self.connection = self.pool.get()
310         self.connection.close()
311         self.assertEqual(len(self.pool.free_items), 0)
312
313     def test_zero_max_age(self):
314         self.pool.put(self.connection)
315         self.pool.clear()
316         self.pool = self.create_pool(max_size=2, max_age=0)
317         self.connection = self.pool.get()
318         self.connection.close()
319         self.assertEqual(len(self.pool.free_items), 0)
320
321     @skipped
322     def test_max_idle(self):
323         # This test is timing-sensitive.  Rename the function without
324         # the "dont" to run it, but beware that it could fail or take
325         # a while.
326
327         self.pool = self.create_pool(max_size=2, max_idle=0.02)
328         self.connection = self.pool.get()
329         self.connection.close()
330         self.assertEqual(len(self.pool.free_items), 1)
331         eventlet.sleep(0.01)  # not long enough to trigger the idle timeout
332         self.assertEqual(len(self.pool.free_items), 1)
333         self.connection = self.pool.get()
334         self.connection.close()
335         self.assertEqual(len(self.pool.free_items), 1)
336         eventlet.sleep(0.01)  # idle timeout should have fired but done nothing
337         self.assertEqual(len(self.pool.free_items), 1)
338         self.connection = self.pool.get()
339         self.connection.close()
340         self.assertEqual(len(self.pool.free_items), 1)
341         eventlet.sleep(0.03)  # long enough to trigger idle timeout for real
342         self.assertEqual(len(self.pool.free_items), 0)
343
344     @skipped
345     def test_max_idle_many(self):
346         # This test is timing-sensitive.  Rename the function without
347         # the "dont" to run it, but beware that it could fail or take
348         # a while.
349
350         self.pool = self.create_pool(max_size=2, max_idle=0.02)
351         self.connection, conn2 = self.pool.get(), self.pool.get()
352         self.connection.close()
353         eventlet.sleep(0.01)
354         self.assertEqual(len(self.pool.free_items), 1)
355         conn2.close()
356         self.assertEqual(len(self.pool.free_items), 2)
357         eventlet.sleep(0.02)  # trigger cleanup of conn1 but not conn2
358         self.assertEqual(len(self.pool.free_items), 1)
359
360     @skipped
361     def test_max_age(self):
362         # This test is timing-sensitive.  Rename the function without
363         # the "dont" to run it, but beware that it could fail or take
364         # a while.
365
366         self.pool = self.create_pool(max_size=2, max_age=0.05)
367         self.connection = self.pool.get()
368         self.connection.close()
369         self.assertEqual(len(self.pool.free_items), 1)
370         eventlet.sleep(0.01)  # not long enough to trigger the age timeout
371         self.assertEqual(len(self.pool.free_items), 1)
372         self.connection = self.pool.get()
373         self.connection.close()
374         self.assertEqual(len(self.pool.free_items), 1)
375         eventlet.sleep(0.05)  # long enough to trigger age timeout
376         self.assertEqual(len(self.pool.free_items), 0)
377
378     @skipped
379     def test_max_age_many(self):
380         # This test is timing-sensitive.  Rename the function without
381         # the "dont" to run it, but beware that it could fail or take
382         # a while.
383
384         self.pool = self.create_pool(max_size=2, max_age=0.15)
385         self.connection, conn2 = self.pool.get(), self.pool.get()
386         self.connection.close()
387         self.assertEqual(len(self.pool.free_items), 1)
388         eventlet.sleep(0)  # not long enough to trigger the age timeout
389         self.assertEqual(len(self.pool.free_items), 1)
390         eventlet.sleep(0.2)  # long enough to trigger age timeout
391         self.assertEqual(len(self.pool.free_items), 0)
392         conn2.close()  # should not be added to the free items
393         self.assertEqual(len(self.pool.free_items), 0)
394
395     def test_waiters_get_woken(self):
396         # verify that when there's someone waiting on an empty pool
397         # and someone puts an immediately-closed connection back in
398         # the pool that the waiter gets woken
399         self.pool.put(self.connection)
400         self.pool.clear()
401         self.pool = self.create_pool(max_size=1, max_age=0)
402
403         self.connection = self.pool.get()
404         self.assertEqual(self.pool.free(), 0)
405         self.assertEqual(self.pool.waiting(), 0)
406         e = event.Event()
407
408         def retrieve(pool, ev):
409             c = pool.get()
410             ev.send(c)
411         eventlet.spawn(retrieve, self.pool, e)
412         eventlet.sleep(0)  # these two sleeps should advance the retrieve
413         eventlet.sleep(0)  # coroutine until it's waiting in get()
414         self.assertEqual(self.pool.free(), 0)
415         self.assertEqual(self.pool.waiting(), 1)
416         self.pool.put(self.connection)
417         timer = eventlet.Timeout(1)
418         conn = e.wait()
419         timer.cancel()
420         self.assertEqual(self.pool.free(), 0)
421         self.assertEqual(self.pool.waiting(), 0)
422         self.pool.put(conn)
423
424     @skipped
425     def test_0_straight_benchmark(self):
426         """ Benchmark; don't run unless you want to wait a while."""
427         import time
428         iterations = 20000
429         c = self.connection.cursor()
430         self.connection.commit()
431
432         def bench(c):
433             for i in six.moves.range(iterations):
434                 c.execute('select 1')
435
436         bench(c)  # warm-up
437         results = []
438         for i in range(3):
439             start = time.time()
440             bench(c)
441             end = time.time()
442             results.append(end - start)
443
444         print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
445             iterations, sum(results) / len(results), results, type(self)))
446
447     def test_raising_create(self):
448         # if the create() method raises an exception the pool should
449         # not lose any connections
450         self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
451         self.assertRaises(RuntimeError, self.pool.get)
452         self.assertEqual(self.pool.free(), 1)
453
454
455 class DummyConnection(object):
456     pass
457
458
459 class DummyDBModule(object):
460     def connect(self, *args, **kwargs):
461         return DummyConnection()
462
463
464 class RaisingDBModule(object):
465     def connect(self, *args, **kw):
466         raise RuntimeError()
467
468
469 class TpoolConnectionPool(DBConnectionPool):
470     __test__ = False  # so that nose doesn't try to execute this directly
471
472     def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
473                     connect_timeout=0.5, module=None):
474         if module is None:
475             module = self._dbmodule
476         return db_pool.TpooledConnectionPool(
477             module,
478             min_size=min_size, max_size=max_size,
479             max_idle=max_idle, max_age=max_age,
480             connect_timeout=connect_timeout,
481             **self._auth)
482
483     @skip_with_pyevent
484     def setUp(self):
485         super(TpoolConnectionPool, self).setUp()
486
487     def tearDown(self):
488         super(TpoolConnectionPool, self).tearDown()
489         from eventlet import tpool
490         tpool.killall()
491
492
493 class RawConnectionPool(DBConnectionPool):
494     __test__ = False  # so that nose doesn't try to execute this directly
495
496     def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
497                     connect_timeout=0.5, module=None):
498         if module is None:
499             module = self._dbmodule
500         return db_pool.RawConnectionPool(
501             module,
502             min_size=min_size, max_size=max_size,
503             max_idle=max_idle, max_age=max_age,
504             connect_timeout=connect_timeout,
505             **self._auth)
506
507
508 class TestRawConnectionPool(TestCase):
509     def test_issue_125(self):
510         # pool = self.create_pool(min_size=3, max_size=5)
511         pool = db_pool.RawConnectionPool(
512             DummyDBModule(),
513             dsn="dbname=test user=jessica port=5433",
514             min_size=3, max_size=5)
515         conn = pool.get()
516         pool.put(conn)
517
518     def test_custom_cleanup_ok(self):
519         cleanup_mock = mock.Mock()
520         pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
521         conn = pool.get()
522         pool.put(conn)
523         assert cleanup_mock.call_count == 1
524
525         with pool.item() as conn:
526             pass
527         assert cleanup_mock.call_count == 2
528
529     def test_custom_cleanup_arg_error(self):
530         cleanup_mock = mock.Mock(side_effect=NotImplementedError)
531         pool = db_pool.RawConnectionPool(DummyDBModule())
532         conn = pool.get()
533         pool.put(conn, cleanup=cleanup_mock)
534         assert cleanup_mock.call_count == 1
535
536         with pool.item(cleanup=cleanup_mock):
537             pass
538         assert cleanup_mock.call_count == 2
539
540     def test_custom_cleanup_fatal(self):
541         state = [0]
542
543         def cleanup(conn):
544             state[0] += 1
545             raise KeyboardInterrupt
546
547         pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
548         conn = pool.get()
549         try:
550             pool.put(conn)
551         except KeyboardInterrupt:
552             pass
553         else:
554             assert False, 'Expected KeyboardInterrupt'
555         assert state[0] == 1
556
557
558 get_auth = get_database_auth
559
560
561 def mysql_requirement(_f):
562     verbose = os.environ.get('eventlet_test_mysql_verbose')
563     try:
564         import MySQLdb
565         try:
566             auth = get_auth()['MySQLdb'].copy()
567             MySQLdb.connect(**auth)
568             return True
569         except MySQLdb.OperationalError:
570             if verbose:
571                 print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
572                 traceback.print_exc()
573             return False
574     except ImportError:
575         if verbose:
576             print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
577         return False
578
579
580 class MysqlConnectionPool(object):
581     dummy_table_sql = """CREATE TEMPORARY TABLE test_table
582         (
583         row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
584         value_int INTEGER,
585         value_float FLOAT,
586         value_string VARCHAR(200),
587         value_uuid CHAR(36),
588         value_binary BLOB,
589         value_binary_string VARCHAR(200) BINARY,
590         value_enum ENUM('Y','N'),
591         created TIMESTAMP
592         ) ENGINE=InnoDB;"""
593
594     @skip_unless(mysql_requirement)
595     def setUp(self):
596         import MySQLdb
597         self._dbmodule = MySQLdb
598         self._auth = get_auth()['MySQLdb']
599         super(MysqlConnectionPool, self).setUp()
600
601     def tearDown(self):
602         super(MysqlConnectionPool, self).tearDown()
603
604     def create_db(self):
605         auth = self._auth.copy()
606         try:
607             self.drop_db()
608         except Exception:
609             pass
610         dbname = 'test%s' % os.getpid()
611         db = self._dbmodule.connect(**auth).cursor()
612         db.execute("create database " + dbname)
613         db.close()
614         self._auth['db'] = dbname
615         del db
616
617     def drop_db(self):
618         db = self._dbmodule.connect(**self._auth).cursor()
619         db.execute("drop database " + self._auth['db'])
620         db.close()
621         del db
622
623
624 class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
625     __test__ = True
626
627
628 class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
629     __test__ = True
630
631
632 def postgres_requirement(_f):
633     try:
634         import psycopg2
635         try:
636             auth = get_auth()['psycopg2'].copy()
637             psycopg2.connect(**auth)
638             return True
639         except psycopg2.OperationalError:
640             print("Skipping postgres tests, error when connecting")
641             return False
642     except ImportError:
643         print("Skipping postgres tests, psycopg2 not importable")
644         return False
645
646
647 class Psycopg2ConnectionPool(object):
648     dummy_table_sql = """CREATE TEMPORARY TABLE test_table
649         (
650         row_id SERIAL PRIMARY KEY,
651         value_int INTEGER,
652         value_float FLOAT,
653         value_string VARCHAR(200),
654         value_uuid CHAR(36),
655         value_binary BYTEA,
656         value_binary_string BYTEA,
657         created TIMESTAMP
658         );"""
659
660     @skip_unless(postgres_requirement)
661     def setUp(self):
662         import psycopg2
663         self._dbmodule = psycopg2
664         self._auth = get_auth()['psycopg2']
665         super(Psycopg2ConnectionPool, self).setUp()
666
667     def tearDown(self):
668         super(Psycopg2ConnectionPool, self).tearDown()
669
670     def create_db(self):
671         dbname = 'test%s' % os.getpid()
672         self._auth['database'] = dbname
673         try:
674             self.drop_db()
675         except Exception:
676             pass
677         auth = self._auth.copy()
678         auth.pop('database')  # can't create if you're connecting to it
679         conn = self._dbmodule.connect(**auth)
680         conn.set_isolation_level(0)
681         db = conn.cursor()
682         db.execute("create database " + dbname)
683         db.close()
684         conn.close()
685
686     def drop_db(self):
687         auth = self._auth.copy()
688         auth.pop('database')  # can't drop database we connected to
689         conn = self._dbmodule.connect(**auth)
690         conn.set_isolation_level(0)
691         db = conn.cursor()
692         db.execute("drop database " + self._auth['database'])
693         db.close()
694         conn.close()
695
696
697 class TestPsycopg2Base(TestCase):
698     __test__ = False
699
700     def test_cursor_works_as_context_manager(self):
701         with self.connection.cursor() as c:
702             c.execute('select 1')
703             row = c.fetchone()
704             assert row == (1,)
705
706
707 class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
708     __test__ = True
709
710
711 class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
712     __test__ = True
713
714
715 if __name__ == '__main__':
716     main()