Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / db_pool_test.py
1 '''Test cases for db_pool
2 '''
3 from __future__ import print_function
4
5 import sys
6 import os
7 import traceback
8 from unittest import TestCase, main
9
10 from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth
11 from eventlet import event
12 from eventlet import db_pool
13 from eventlet.support import six
14 import eventlet
15
16
17 class DBTester(object):
18     __test__ = False  # so that nose doesn't try to execute this directly
19
20     def setUp(self):
21         self.create_db()
22         self.connection = None
23         connection = self._dbmodule.connect(**self._auth)
24         cursor = connection.cursor()
25         cursor.execute("""CREATE  TABLE gargleblatz
26         (
27         a INTEGER
28         );""")
29         connection.commit()
30         cursor.close()
31         connection.close()
32
33     def tearDown(self):
34         if self.connection:
35             self.connection.close()
36         self.drop_db()
37
38     def set_up_dummy_table(self, connection=None):
39         close_connection = False
40         if connection is None:
41             close_connection = True
42             if self.connection is None:
43                 connection = self._dbmodule.connect(**self._auth)
44             else:
45                 connection = self.connection
46
47         cursor = connection.cursor()
48         cursor.execute(self.dummy_table_sql)
49         connection.commit()
50         cursor.close()
51         if close_connection:
52             connection.close()
53
54
55 # silly mock class
56 class Mock(object):
57     pass
58
59
60 class DBConnectionPool(DBTester):
61     __test__ = False  # so that nose doesn't try to execute this directly
62
63     def setUp(self):
64         super(DBConnectionPool, self).setUp()
65         self.pool = self.create_pool()
66         self.connection = self.pool.get()
67
68     def tearDown(self):
69         if self.connection:
70             self.pool.put(self.connection)
71         self.pool.clear()
72         super(DBConnectionPool, self).tearDown()
73
74     def assert_cursor_works(self, cursor):
75         cursor.execute("select 1")
76         rows = cursor.fetchall()
77         assert rows
78
79     def test_connecting(self):
80         assert self.connection is not None
81
82     def test_create_cursor(self):
83         cursor = self.connection.cursor()
84         cursor.close()
85
86     def test_run_query(self):
87         cursor = self.connection.cursor()
88         self.assert_cursor_works(cursor)
89         cursor.close()
90
91     def test_run_bad_query(self):
92         cursor = self.connection.cursor()
93         try:
94             cursor.execute("garbage blah blah")
95             assert False
96         except AssertionError:
97             raise
98         except Exception:
99             pass
100         cursor.close()
101
102     def test_put_none(self):
103         # the pool is of size 1, and its only connection is out
104         assert self.pool.free() == 0
105         self.pool.put(None)
106         # ha ha we fooled it into thinking that we had a dead process
107         assert self.pool.free() == 1
108         conn2 = self.pool.get()
109         assert conn2 is not None
110         assert conn2.cursor
111         self.pool.put(conn2)
112
113     def test_close_does_a_put(self):
114         assert self.pool.free() == 0
115         self.connection.close()
116         assert self.pool.free() == 1
117         self.assertRaises(AttributeError, self.connection.cursor)
118
119     @skipped
120     def test_deletion_does_a_put(self):
121         # doing a put on del causes some issues if __del__ is called in the
122         # main coroutine, so, not doing that for now
123         assert self.pool.free() == 0
124         self.connection = None
125         assert self.pool.free() == 1
126
127     def test_put_doesnt_double_wrap(self):
128         self.pool.put(self.connection)
129         conn = self.pool.get()
130         assert not isinstance(conn._base, db_pool.PooledConnectionWrapper)
131         self.pool.put(conn)
132
133     def test_bool(self):
134         assert self.connection
135         self.connection.close()
136         assert not self.connection
137
138     def fill_up_table(self, conn):
139         curs = conn.cursor()
140         for i in six.moves.range(1000):
141             curs.execute('insert into test_table (value_int) values (%s)' % i)
142         conn.commit()
143
144     def test_returns_immediately(self):
145         self.pool = self.create_pool()
146         conn = self.pool.get()
147         self.set_up_dummy_table(conn)
148         self.fill_up_table(conn)
149         curs = conn.cursor()
150         results = []
151         SHORT_QUERY = "select * from test_table"
152         evt = event.Event()
153
154         def a_query():
155             self.assert_cursor_works(curs)
156             curs.execute(SHORT_QUERY)
157             results.append(2)
158             evt.send()
159         eventlet.spawn(a_query)
160         results.append(1)
161         self.assertEqual([1], results)
162         evt.wait()
163         self.assertEqual([1, 2], results)
164         self.pool.put(conn)
165
166     def test_connection_is_clean_after_put(self):
167         self.pool = self.create_pool()
168         conn = self.pool.get()
169         self.set_up_dummy_table(conn)
170         curs = conn.cursor()
171         for i in range(10):
172             curs.execute('insert into test_table (value_int) values (%s)' % i)
173         # do not commit  :-)
174         self.pool.put(conn)
175         del conn
176         conn2 = self.pool.get()
177         curs2 = conn2.cursor()
178         for i in range(10):
179             curs2.execute('insert into test_table (value_int) values (%s)' % i)
180         conn2.commit()
181         curs2.execute("select * from test_table")
182         # we should have only inserted them once
183         self.assertEqual(10, curs2.rowcount)
184         self.pool.put(conn2)
185
186     def test_visibility_from_other_connections(self):
187         self.pool = self.create_pool(max_size=3)
188         conn = self.pool.get()
189         conn2 = self.pool.get()
190         curs = conn.cursor()
191         try:
192             curs2 = conn2.cursor()
193             curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
194             self.assertEqual(curs2.rowcount, 1)
195             conn2.commit()
196             selection_query = "select * from gargleblatz"
197             curs2.execute(selection_query)
198             self.assertEqual(curs2.rowcount, 1)
199             del curs2
200             self.pool.put(conn2)
201             # create a new connection, it should see the addition
202             conn3 = self.pool.get()
203             curs3 = conn3.cursor()
204             curs3.execute(selection_query)
205             self.assertEqual(curs3.rowcount, 1)
206             # now, does the already-open connection see it?
207             curs.execute(selection_query)
208             self.assertEqual(curs.rowcount, 1)
209             self.pool.put(conn3)
210         finally:
211             # clean up my litter
212             curs.execute("delete from gargleblatz where a=314159")
213             conn.commit()
214             self.pool.put(conn)
215
216     @skipped
217     def test_two_simultaneous_connections(self):
218         # timing-sensitive test, disabled until we come up with a better
219         # way to do this
220         self.pool = self.create_pool(max_size=2)
221         conn = self.pool.get()
222         self.set_up_dummy_table(conn)
223         self.fill_up_table(conn)
224         curs = conn.cursor()
225         conn2 = self.pool.get()
226         self.set_up_dummy_table(conn2)
227         self.fill_up_table(conn2)
228         curs2 = conn2.cursor()
229         results = []
230         LONG_QUERY = "select * from test_table"
231         SHORT_QUERY = "select * from test_table where row_id <= 20"
232
233         evt = event.Event()
234
235         def long_running_query():
236             self.assert_cursor_works(curs)
237             curs.execute(LONG_QUERY)
238             results.append(1)
239             evt.send()
240         evt2 = event.Event()
241
242         def short_running_query():
243             self.assert_cursor_works(curs2)
244             curs2.execute(SHORT_QUERY)
245             results.append(2)
246             evt2.send()
247
248         eventlet.spawn(long_running_query)
249         eventlet.spawn(short_running_query)
250         evt.wait()
251         evt2.wait()
252         results.sort()
253         self.assertEqual([1, 2], results)
254
255     def test_clear(self):
256         self.pool = self.create_pool()
257         self.pool.put(self.connection)
258         self.pool.clear()
259         self.assertEqual(len(self.pool.free_items), 0)
260
261     def test_clear_warmup(self):
262         """Clear implicitly created connections (min_size > 0)"""
263         self.pool = self.create_pool(min_size=1)
264         self.pool.clear()
265         self.assertEqual(len(self.pool.free_items), 0)
266
267     def test_unwrap_connection(self):
268         self.assert_(isinstance(self.connection,
269                                 db_pool.GenericConnectionWrapper))
270         conn = self.pool._unwrap_connection(self.connection)
271         assert not isinstance(conn, db_pool.GenericConnectionWrapper)
272
273         self.assertEqual(None, self.pool._unwrap_connection(None))
274         self.assertEqual(None, self.pool._unwrap_connection(1))
275
276         # testing duck typing here -- as long as the connection has a
277         # _base attribute, it should be unwrappable
278         x = Mock()
279         x._base = 'hi'
280         self.assertEqual('hi', self.pool._unwrap_connection(x))
281         conn.close()
282
283     def test_safe_close(self):
284         self.pool._safe_close(self.connection, quiet=True)
285         self.assertEqual(len(self.pool.free_items), 1)
286
287         self.pool._safe_close(None)
288         self.pool._safe_close(1)
289
290         # now we're really going for 100% coverage
291         x = Mock()
292
293         def fail():
294             raise KeyboardInterrupt()
295         x.close = fail
296         self.assertRaises(KeyboardInterrupt, self.pool._safe_close, x)
297
298         x = Mock()
299
300         def fail2():
301             raise RuntimeError("if this line has been printed, the test succeeded")
302         x.close = fail2
303         self.pool._safe_close(x, quiet=False)
304
305     def test_zero_max_idle(self):
306         self.pool.put(self.connection)
307         self.pool.clear()
308         self.pool = self.create_pool(max_size=2, max_idle=0)
309         self.connection = self.pool.get()
310         self.connection.close()
311         self.assertEqual(len(self.pool.free_items), 0)
312
313     def test_zero_max_age(self):
314         self.pool.put(self.connection)
315         self.pool.clear()
316         self.pool = self.create_pool(max_size=2, max_age=0)
317         self.connection = self.pool.get()
318         self.connection.close()
319         self.assertEqual(len(self.pool.free_items), 0)
320
321     @skipped
322     def test_max_idle(self):
323         # This test is timing-sensitive.  Rename the function without
324         # the "dont" to run it, but beware that it could fail or take
325         # a while.
326
327         self.pool = self.create_pool(max_size=2, max_idle=0.02)
328         self.connection = self.pool.get()
329         self.connection.close()
330         self.assertEqual(len(self.pool.free_items), 1)
331         eventlet.sleep(0.01)  # not long enough to trigger the idle timeout
332         self.assertEqual(len(self.pool.free_items), 1)
333         self.connection = self.pool.get()
334         self.connection.close()
335         self.assertEqual(len(self.pool.free_items), 1)
336         eventlet.sleep(0.01)  # idle timeout should have fired but done nothing
337         self.assertEqual(len(self.pool.free_items), 1)
338         self.connection = self.pool.get()
339         self.connection.close()
340         self.assertEqual(len(self.pool.free_items), 1)
341         eventlet.sleep(0.03)  # long enough to trigger idle timeout for real
342         self.assertEqual(len(self.pool.free_items), 0)
343
344     @skipped
345     def test_max_idle_many(self):
346         # This test is timing-sensitive.  Rename the function without
347         # the "dont" to run it, but beware that it could fail or take
348         # a while.
349
350         self.pool = self.create_pool(max_size=2, max_idle=0.02)
351         self.connection, conn2 = self.pool.get(), self.pool.get()
352         self.connection.close()
353         eventlet.sleep(0.01)
354         self.assertEqual(len(self.pool.free_items), 1)
355         conn2.close()
356         self.assertEqual(len(self.pool.free_items), 2)
357         eventlet.sleep(0.02)  # trigger cleanup of conn1 but not conn2
358         self.assertEqual(len(self.pool.free_items), 1)
359
360     @skipped
361     def test_max_age(self):
362         # This test is timing-sensitive.  Rename the function without
363         # the "dont" to run it, but beware that it could fail or take
364         # a while.
365
366         self.pool = self.create_pool(max_size=2, max_age=0.05)
367         self.connection = self.pool.get()
368         self.connection.close()
369         self.assertEqual(len(self.pool.free_items), 1)
370         eventlet.sleep(0.01)  # not long enough to trigger the age timeout
371         self.assertEqual(len(self.pool.free_items), 1)
372         self.connection = self.pool.get()
373         self.connection.close()
374         self.assertEqual(len(self.pool.free_items), 1)
375         eventlet.sleep(0.05)  # long enough to trigger age timeout
376         self.assertEqual(len(self.pool.free_items), 0)
377
378     @skipped
379     def test_max_age_many(self):
380         # This test is timing-sensitive.  Rename the function without
381         # the "dont" to run it, but beware that it could fail or take
382         # a while.
383
384         self.pool = self.create_pool(max_size=2, max_age=0.15)
385         self.connection, conn2 = self.pool.get(), self.pool.get()
386         self.connection.close()
387         self.assertEqual(len(self.pool.free_items), 1)
388         eventlet.sleep(0)  # not long enough to trigger the age timeout
389         self.assertEqual(len(self.pool.free_items), 1)
390         eventlet.sleep(0.2)  # long enough to trigger age timeout
391         self.assertEqual(len(self.pool.free_items), 0)
392         conn2.close()  # should not be added to the free items
393         self.assertEqual(len(self.pool.free_items), 0)
394
395     def test_waiters_get_woken(self):
396         # verify that when there's someone waiting on an empty pool
397         # and someone puts an immediately-closed connection back in
398         # the pool that the waiter gets woken
399         self.pool.put(self.connection)
400         self.pool.clear()
401         self.pool = self.create_pool(max_size=1, max_age=0)
402
403         self.connection = self.pool.get()
404         self.assertEqual(self.pool.free(), 0)
405         self.assertEqual(self.pool.waiting(), 0)
406         e = event.Event()
407
408         def retrieve(pool, ev):
409             c = pool.get()
410             ev.send(c)
411         eventlet.spawn(retrieve, self.pool, e)
412         eventlet.sleep(0)  # these two sleeps should advance the retrieve
413         eventlet.sleep(0)  # coroutine until it's waiting in get()
414         self.assertEqual(self.pool.free(), 0)
415         self.assertEqual(self.pool.waiting(), 1)
416         self.pool.put(self.connection)
417         timer = eventlet.Timeout(1)
418         conn = e.wait()
419         timer.cancel()
420         self.assertEqual(self.pool.free(), 0)
421         self.assertEqual(self.pool.waiting(), 0)
422         self.pool.put(conn)
423
424     @skipped
425     def test_0_straight_benchmark(self):
426         """ Benchmark; don't run unless you want to wait a while."""
427         import time
428         iterations = 20000
429         c = self.connection.cursor()
430         self.connection.commit()
431
432         def bench(c):
433             for i in six.moves.range(iterations):
434                 c.execute('select 1')
435
436         bench(c)  # warm-up
437         results = []
438         for i in range(3):
439             start = time.time()
440             bench(c)
441             end = time.time()
442             results.append(end - start)
443
444         print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
445             iterations, sum(results) / len(results), results, type(self)))
446
447     def test_raising_create(self):
448         # if the create() method raises an exception the pool should
449         # not lose any connections
450         self.pool = self.create_pool(max_size=1, module=RaisingDBModule())
451         self.assertRaises(RuntimeError, self.pool.get)
452         self.assertEqual(self.pool.free(), 1)
453
454
455 class DummyConnection(object):
456     def rollback(self):
457         pass
458
459
460 class DummyDBModule(object):
461     def connect(self, *args, **kwargs):
462         return DummyConnection()
463
464
465 class RaisingDBModule(object):
466     def connect(self, *args, **kw):
467         raise RuntimeError()
468
469
470 class TpoolConnectionPool(DBConnectionPool):
471     __test__ = False  # so that nose doesn't try to execute this directly
472
473     def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
474                     connect_timeout=0.5, module=None):
475         if module is None:
476             module = self._dbmodule
477         return db_pool.TpooledConnectionPool(
478             module,
479             min_size=min_size, max_size=max_size,
480             max_idle=max_idle, max_age=max_age,
481             connect_timeout=connect_timeout,
482             **self._auth)
483
484     @skip_with_pyevent
485     def setUp(self):
486         super(TpoolConnectionPool, self).setUp()
487
488     def tearDown(self):
489         super(TpoolConnectionPool, self).tearDown()
490         from eventlet import tpool
491         tpool.killall()
492
493
494 class RawConnectionPool(DBConnectionPool):
495     __test__ = False  # so that nose doesn't try to execute this directly
496
497     def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
498                     connect_timeout=0.5, module=None):
499         if module is None:
500             module = self._dbmodule
501         return db_pool.RawConnectionPool(
502             module,
503             min_size=min_size, max_size=max_size,
504             max_idle=max_idle, max_age=max_age,
505             connect_timeout=connect_timeout,
506             **self._auth)
507
508
509 def test_raw_pool_issue_125():
510     # pool = self.create_pool(min_size=3, max_size=5)
511     pool = db_pool.RawConnectionPool(
512         DummyDBModule(),
513         dsn="dbname=test user=jessica port=5433",
514         min_size=3, max_size=5)
515     conn = pool.get()
516     pool.put(conn)
517
518
519 def test_raw_pool_custom_cleanup_ok():
520     cleanup_mock = mock.Mock()
521     pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
522     conn = pool.get()
523     pool.put(conn)
524     assert cleanup_mock.call_count == 1
525
526     with pool.item() as conn:
527         pass
528     assert cleanup_mock.call_count == 2
529
530
531 def test_raw_pool_custom_cleanup_arg_error():
532     cleanup_mock = mock.Mock(side_effect=NotImplementedError)
533     pool = db_pool.RawConnectionPool(DummyDBModule())
534     conn = pool.get()
535     pool.put(conn, cleanup=cleanup_mock)
536     assert cleanup_mock.call_count == 1
537
538     with pool.item(cleanup=cleanup_mock):
539         pass
540     assert cleanup_mock.call_count == 2
541
542
543 def test_raw_pool_custom_cleanup_fatal():
544     state = [0]
545
546     def cleanup(conn):
547         state[0] += 1
548         raise KeyboardInterrupt
549
550     pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup)
551     conn = pool.get()
552     try:
553         pool.put(conn)
554     except KeyboardInterrupt:
555         pass
556     else:
557         assert False, 'Expected KeyboardInterrupt'
558     assert state[0] == 1
559
560
561 def test_raw_pool_clear_update_current_size():
562     # https://github.com/eventlet/eventlet/issues/139
563     # BaseConnectionPool.clear does not update .current_size.
564     # That leads to situation when new connections could not be created.
565     pool = db_pool.RawConnectionPool(DummyDBModule())
566     pool.get().close()
567     assert pool.current_size == 1
568     assert len(pool.free_items) == 1
569     pool.clear()
570     assert pool.current_size == 0
571     assert len(pool.free_items) == 0
572
573
574 get_auth = get_database_auth
575
576
577 def mysql_requirement(_f):
578     verbose = os.environ.get('eventlet_test_mysql_verbose')
579     try:
580         import MySQLdb
581         try:
582             auth = get_auth()['MySQLdb'].copy()
583             MySQLdb.connect(**auth)
584             return True
585         except MySQLdb.OperationalError:
586             if verbose:
587                 print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
588                 traceback.print_exc()
589             return False
590     except ImportError:
591         if verbose:
592             print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
593         return False
594
595
596 class MysqlConnectionPool(object):
597     dummy_table_sql = """CREATE TEMPORARY TABLE test_table
598         (
599         row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
600         value_int INTEGER,
601         value_float FLOAT,
602         value_string VARCHAR(200),
603         value_uuid CHAR(36),
604         value_binary BLOB,
605         value_binary_string VARCHAR(200) BINARY,
606         value_enum ENUM('Y','N'),
607         created TIMESTAMP
608         ) ENGINE=InnoDB;"""
609
610     @skip_unless(mysql_requirement)
611     def setUp(self):
612         import MySQLdb
613         self._dbmodule = MySQLdb
614         self._auth = get_auth()['MySQLdb']
615         super(MysqlConnectionPool, self).setUp()
616
617     def tearDown(self):
618         super(MysqlConnectionPool, self).tearDown()
619
620     def create_db(self):
621         auth = self._auth.copy()
622         try:
623             self.drop_db()
624         except Exception:
625             pass
626         dbname = 'test%s' % os.getpid()
627         db = self._dbmodule.connect(**auth).cursor()
628         db.execute("create database " + dbname)
629         db.close()
630         self._auth['db'] = dbname
631         del db
632
633     def drop_db(self):
634         db = self._dbmodule.connect(**self._auth).cursor()
635         db.execute("drop database " + self._auth['db'])
636         db.close()
637         del db
638
639
640 class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
641     __test__ = True
642
643
644 class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
645     __test__ = True
646
647
648 def postgres_requirement(_f):
649     try:
650         import psycopg2
651         try:
652             auth = get_auth()['psycopg2'].copy()
653             psycopg2.connect(**auth)
654             return True
655         except psycopg2.OperationalError:
656             print("Skipping postgres tests, error when connecting")
657             return False
658     except ImportError:
659         print("Skipping postgres tests, psycopg2 not importable")
660         return False
661
662
663 class Psycopg2ConnectionPool(object):
664     dummy_table_sql = """CREATE TEMPORARY TABLE test_table
665         (
666         row_id SERIAL PRIMARY KEY,
667         value_int INTEGER,
668         value_float FLOAT,
669         value_string VARCHAR(200),
670         value_uuid CHAR(36),
671         value_binary BYTEA,
672         value_binary_string BYTEA,
673         created TIMESTAMP
674         );"""
675
676     @skip_unless(postgres_requirement)
677     def setUp(self):
678         import psycopg2
679         self._dbmodule = psycopg2
680         self._auth = get_auth()['psycopg2']
681         super(Psycopg2ConnectionPool, self).setUp()
682
683     def tearDown(self):
684         super(Psycopg2ConnectionPool, self).tearDown()
685
686     def create_db(self):
687         dbname = 'test%s' % os.getpid()
688         self._auth['database'] = dbname
689         try:
690             self.drop_db()
691         except Exception:
692             pass
693         auth = self._auth.copy()
694         auth.pop('database')  # can't create if you're connecting to it
695         conn = self._dbmodule.connect(**auth)
696         conn.set_isolation_level(0)
697         db = conn.cursor()
698         db.execute("create database " + dbname)
699         db.close()
700         conn.close()
701
702     def drop_db(self):
703         auth = self._auth.copy()
704         auth.pop('database')  # can't drop database we connected to
705         conn = self._dbmodule.connect(**auth)
706         conn.set_isolation_level(0)
707         db = conn.cursor()
708         db.execute("drop database " + self._auth['database'])
709         db.close()
710         conn.close()
711
712
713 class TestPsycopg2Base(TestCase):
714     __test__ = False
715
716     def test_cursor_works_as_context_manager(self):
717         with self.connection.cursor() as c:
718             c.execute('select 1')
719             row = c.fetchone()
720             assert row == (1,)
721
722
723 class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
724     __test__ = True
725
726
727 class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
728     __test__ = True
729
730
731 if __name__ == '__main__':
732     main()