Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / tests / mysqldb_test.py
1 from __future__ import print_function
2
3 import os
4 import time
5 import traceback
6
7 import eventlet
8 from eventlet import event
9 try:
10     from eventlet.green import MySQLdb
11 except ImportError:
12     MySQLdb = False
13 import tests
14 from tests import skip_unless, using_pyevent, get_database_auth
15
16
17 def mysql_requirement(_f):
18     """We want to skip tests if using pyevent, MySQLdb is not installed, or if
19     there is no database running on the localhost that the auth file grants
20     us access to.
21
22     This errs on the side of skipping tests if everything is not right, but
23     it's better than a million tests failing when you don't care about mysql
24     support."""
25     if using_pyevent(_f):
26         return False
27     if MySQLdb is False:
28         print("Skipping mysql tests, MySQLdb not importable")
29         return False
30     try:
31         auth = get_database_auth()['MySQLdb'].copy()
32         MySQLdb.connect(**auth)
33         return True
34     except MySQLdb.OperationalError:
35         print("Skipping mysql tests, error when connecting:")
36         traceback.print_exc()
37         return False
38
39
40 class TestMySQLdb(tests.LimitedTestCase):
41     def setUp(self):
42         self._auth = get_database_auth()['MySQLdb']
43         self.create_db()
44         self.connection = None
45         self.connection = MySQLdb.connect(**self._auth)
46         cursor = self.connection.cursor()
47         cursor.execute("""CREATE TABLE gargleblatz
48         (
49         a INTEGER
50         );""")
51         self.connection.commit()
52         cursor.close()
53
54         super(TestMySQLdb, self).setUp()
55
56     def tearDown(self):
57         if self.connection:
58             self.connection.close()
59         self.drop_db()
60
61         super(TestMySQLdb, self).tearDown()
62
63     @skip_unless(mysql_requirement)
64     def create_db(self):
65         auth = self._auth.copy()
66         try:
67             self.drop_db()
68         except Exception:
69             pass
70         dbname = 'test_%d_%d' % (os.getpid(), int(time.time() * 1000))
71         db = MySQLdb.connect(**auth).cursor()
72         db.execute("create database " + dbname)
73         db.close()
74         self._auth['db'] = dbname
75         del db
76
77     def drop_db(self):
78         db = MySQLdb.connect(**self._auth).cursor()
79         db.execute("drop database " + self._auth['db'])
80         db.close()
81         del db
82
83     def set_up_dummy_table(self, connection=None):
84         close_connection = False
85         if connection is None:
86             close_connection = True
87             if self.connection is None:
88                 connection = MySQLdb.connect(**self._auth)
89             else:
90                 connection = self.connection
91
92         cursor = connection.cursor()
93         cursor.execute(self.dummy_table_sql)
94         connection.commit()
95         cursor.close()
96         if close_connection:
97             connection.close()
98
99     dummy_table_sql = """CREATE TEMPORARY TABLE test_table
100         (
101         row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
102         value_int INTEGER,
103         value_float FLOAT,
104         value_string VARCHAR(200),
105         value_uuid CHAR(36),
106         value_binary BLOB,
107         value_binary_string VARCHAR(200) BINARY,
108         value_enum ENUM('Y','N'),
109         created TIMESTAMP
110         ) ENGINE=InnoDB;"""
111
112     def assert_cursor_yields(self, curs):
113         counter = [0]
114
115         def tick():
116             while True:
117                 counter[0] += 1
118                 eventlet.sleep()
119         gt = eventlet.spawn(tick)
120         curs.execute("select 1")
121         rows = curs.fetchall()
122         self.assertEqual(len(rows), 1)
123         self.assertEqual(len(rows[0]), 1)
124         self.assertEqual(rows[0][0], 1)
125         assert counter[0] > 0, counter[0]
126         gt.kill()
127
128     def assert_cursor_works(self, cursor):
129         cursor.execute("select 1")
130         rows = cursor.fetchall()
131         self.assertEqual(len(rows), 1)
132         self.assertEqual(len(rows[0]), 1)
133         self.assertEqual(rows[0][0], 1)
134         self.assert_cursor_yields(cursor)
135
136     def assert_connection_works(self, conn):
137         curs = conn.cursor()
138         self.assert_cursor_works(curs)
139
140     def test_module_attributes(self):
141         import MySQLdb as orig
142         for key in dir(orig):
143             if key not in ('__author__', '__path__', '__revision__',
144                            '__version__', '__loader__'):
145                 assert hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key))
146
147     def test_connecting(self):
148         assert self.connection is not None
149
150     def test_connecting_annoyingly(self):
151         self.assert_connection_works(MySQLdb.Connect(**self._auth))
152         self.assert_connection_works(MySQLdb.Connection(**self._auth))
153         self.assert_connection_works(MySQLdb.connections.Connection(**self._auth))
154
155     def test_create_cursor(self):
156         cursor = self.connection.cursor()
157         cursor.close()
158
159     def test_run_query(self):
160         cursor = self.connection.cursor()
161         self.assert_cursor_works(cursor)
162         cursor.close()
163
164     def test_run_bad_query(self):
165         cursor = self.connection.cursor()
166         try:
167             cursor.execute("garbage blah blah")
168             assert False
169         except AssertionError:
170             raise
171         except Exception:
172             pass
173         cursor.close()
174
175     def fill_up_table(self, conn):
176         curs = conn.cursor()
177         for i in range(1000):
178             curs.execute('insert into test_table (value_int) values (%s)' % i)
179         conn.commit()
180
181     def test_yields(self):
182         conn = self.connection
183         self.set_up_dummy_table(conn)
184         self.fill_up_table(conn)
185         curs = conn.cursor()
186         results = []
187         SHORT_QUERY = "select * from test_table"
188         evt = event.Event()
189
190         def a_query():
191             self.assert_cursor_works(curs)
192             curs.execute(SHORT_QUERY)
193             results.append(2)
194             evt.send()
195         eventlet.spawn(a_query)
196         results.append(1)
197         self.assertEqual([1], results)
198         evt.wait()
199         self.assertEqual([1, 2], results)
200
201     def test_visibility_from_other_connections(self):
202         conn = MySQLdb.connect(**self._auth)
203         conn2 = MySQLdb.connect(**self._auth)
204         curs = conn.cursor()
205         try:
206             curs2 = conn2.cursor()
207             curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
208             self.assertEqual(curs2.rowcount, 1)
209             conn2.commit()
210             selection_query = "select * from gargleblatz"
211             curs2.execute(selection_query)
212             self.assertEqual(curs2.rowcount, 1)
213             del curs2, conn2
214             # create a new connection, it should see the addition
215             conn3 = MySQLdb.connect(**self._auth)
216             curs3 = conn3.cursor()
217             curs3.execute(selection_query)
218             self.assertEqual(curs3.rowcount, 1)
219             # now, does the already-open connection see it?
220             curs.execute(selection_query)
221             self.assertEqual(curs.rowcount, 1)
222             del curs3, conn3
223         finally:
224             # clean up my litter
225             curs.execute("delete from gargleblatz where a=314159")
226             conn.commit()
227
228
229 class TestMonkeyPatch(tests.LimitedTestCase):
230     @skip_unless(mysql_requirement)
231     def test_monkey_patching(self):
232         tests.run_isolated('mysqldb_monkey_patch.py')