Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / tests / mysqldb_test.py
1 from __future__ import print_function
2
3 import os
4 import time
5 import traceback
6
7 import eventlet
8 from eventlet import event
9 from tests import (
10     LimitedTestCase,
11     run_python,
12     skip_unless, using_pyevent, get_database_auth,
13 )
14 try:
15     from eventlet.green import MySQLdb
16 except ImportError:
17     MySQLdb = False
18
19
20 def mysql_requirement(_f):
21     """We want to skip tests if using pyevent, MySQLdb is not installed, or if
22     there is no database running on the localhost that the auth file grants
23     us access to.
24
25     This errs on the side of skipping tests if everything is not right, but
26     it's better than a million tests failing when you don't care about mysql
27     support."""
28     if using_pyevent(_f):
29         return False
30     if MySQLdb is False:
31         print("Skipping mysql tests, MySQLdb not importable")
32         return False
33     try:
34         auth = get_database_auth()['MySQLdb'].copy()
35         MySQLdb.connect(**auth)
36         return True
37     except MySQLdb.OperationalError:
38         print("Skipping mysql tests, error when connecting:")
39         traceback.print_exc()
40         return False
41
42
43 class TestMySQLdb(LimitedTestCase):
44     def setUp(self):
45         self._auth = get_database_auth()['MySQLdb']
46         self.create_db()
47         self.connection = None
48         self.connection = MySQLdb.connect(**self._auth)
49         cursor = self.connection.cursor()
50         cursor.execute("""CREATE TABLE gargleblatz
51         (
52         a INTEGER
53         );""")
54         self.connection.commit()
55         cursor.close()
56
57         super(TestMySQLdb, self).setUp()
58
59     def tearDown(self):
60         if self.connection:
61             self.connection.close()
62         self.drop_db()
63
64         super(TestMySQLdb, self).tearDown()
65
66     @skip_unless(mysql_requirement)
67     def create_db(self):
68         auth = self._auth.copy()
69         try:
70             self.drop_db()
71         except Exception:
72             pass
73         dbname = 'test_%d_%d' % (os.getpid(), int(time.time() * 1000))
74         db = MySQLdb.connect(**auth).cursor()
75         db.execute("create database " + dbname)
76         db.close()
77         self._auth['db'] = dbname
78         del db
79
80     def drop_db(self):
81         db = MySQLdb.connect(**self._auth).cursor()
82         db.execute("drop database " + self._auth['db'])
83         db.close()
84         del db
85
86     def set_up_dummy_table(self, connection=None):
87         close_connection = False
88         if connection is None:
89             close_connection = True
90             if self.connection is None:
91                 connection = MySQLdb.connect(**self._auth)
92             else:
93                 connection = self.connection
94
95         cursor = connection.cursor()
96         cursor.execute(self.dummy_table_sql)
97         connection.commit()
98         cursor.close()
99         if close_connection:
100             connection.close()
101
102     dummy_table_sql = """CREATE TEMPORARY TABLE test_table
103         (
104         row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
105         value_int INTEGER,
106         value_float FLOAT,
107         value_string VARCHAR(200),
108         value_uuid CHAR(36),
109         value_binary BLOB,
110         value_binary_string VARCHAR(200) BINARY,
111         value_enum ENUM('Y','N'),
112         created TIMESTAMP
113         ) ENGINE=InnoDB;"""
114
115     def assert_cursor_yields(self, curs):
116         counter = [0]
117
118         def tick():
119             while True:
120                 counter[0] += 1
121                 eventlet.sleep()
122         gt = eventlet.spawn(tick)
123         curs.execute("select 1")
124         rows = curs.fetchall()
125         self.assertEqual(len(rows), 1)
126         self.assertEqual(len(rows[0]), 1)
127         self.assertEqual(rows[0][0], 1)
128         assert counter[0] > 0, counter[0]
129         gt.kill()
130
131     def assert_cursor_works(self, cursor):
132         cursor.execute("select 1")
133         rows = cursor.fetchall()
134         self.assertEqual(len(rows), 1)
135         self.assertEqual(len(rows[0]), 1)
136         self.assertEqual(rows[0][0], 1)
137         self.assert_cursor_yields(cursor)
138
139     def assert_connection_works(self, conn):
140         curs = conn.cursor()
141         self.assert_cursor_works(curs)
142
143     def test_module_attributes(self):
144         import MySQLdb as orig
145         for key in dir(orig):
146             if key not in ('__author__', '__path__', '__revision__',
147                            '__version__', '__loader__'):
148                 assert hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key))
149
150     def test_connecting(self):
151         assert self.connection is not None
152
153     def test_connecting_annoyingly(self):
154         self.assert_connection_works(MySQLdb.Connect(**self._auth))
155         self.assert_connection_works(MySQLdb.Connection(**self._auth))
156         self.assert_connection_works(MySQLdb.connections.Connection(**self._auth))
157
158     def test_create_cursor(self):
159         cursor = self.connection.cursor()
160         cursor.close()
161
162     def test_run_query(self):
163         cursor = self.connection.cursor()
164         self.assert_cursor_works(cursor)
165         cursor.close()
166
167     def test_run_bad_query(self):
168         cursor = self.connection.cursor()
169         try:
170             cursor.execute("garbage blah blah")
171             assert False
172         except AssertionError:
173             raise
174         except Exception:
175             pass
176         cursor.close()
177
178     def fill_up_table(self, conn):
179         curs = conn.cursor()
180         for i in range(1000):
181             curs.execute('insert into test_table (value_int) values (%s)' % i)
182         conn.commit()
183
184     def test_yields(self):
185         conn = self.connection
186         self.set_up_dummy_table(conn)
187         self.fill_up_table(conn)
188         curs = conn.cursor()
189         results = []
190         SHORT_QUERY = "select * from test_table"
191         evt = event.Event()
192
193         def a_query():
194             self.assert_cursor_works(curs)
195             curs.execute(SHORT_QUERY)
196             results.append(2)
197             evt.send()
198         eventlet.spawn(a_query)
199         results.append(1)
200         self.assertEqual([1], results)
201         evt.wait()
202         self.assertEqual([1, 2], results)
203
204     def test_visibility_from_other_connections(self):
205         conn = MySQLdb.connect(**self._auth)
206         conn2 = MySQLdb.connect(**self._auth)
207         curs = conn.cursor()
208         try:
209             curs2 = conn2.cursor()
210             curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
211             self.assertEqual(curs2.rowcount, 1)
212             conn2.commit()
213             selection_query = "select * from gargleblatz"
214             curs2.execute(selection_query)
215             self.assertEqual(curs2.rowcount, 1)
216             del curs2, conn2
217             # create a new connection, it should see the addition
218             conn3 = MySQLdb.connect(**self._auth)
219             curs3 = conn3.cursor()
220             curs3.execute(selection_query)
221             self.assertEqual(curs3.rowcount, 1)
222             # now, does the already-open connection see it?
223             curs.execute(selection_query)
224             self.assertEqual(curs.rowcount, 1)
225             del curs3, conn3
226         finally:
227             # clean up my litter
228             curs.execute("delete from gargleblatz where a=314159")
229             conn.commit()
230
231
232 class TestMonkeyPatch(LimitedTestCase):
233     @skip_unless(mysql_requirement)
234     def test_monkey_patching(self):
235         testcode_path = os.path.join(
236             os.path.dirname(os.path.abspath(__file__)),
237             'mysqldb_test_monkey_patch.py',
238         )
239         output = run_python(testcode_path)
240         lines = output.splitlines()
241         self.assertEqual(len(lines), 2, output)
242         self.assertEqual(lines[0].replace("psycopg,", ""),
243                          'mysqltest MySQLdb,os,select,socket,thread,time')
244         self.assertEqual(lines[1], "connect True")