from __future__ import print_function import os import time import traceback import eventlet from eventlet import event from tests import ( LimitedTestCase, run_python, skip_unless, using_pyevent, get_database_auth, ) try: from eventlet.green import MySQLdb except ImportError: MySQLdb = False def mysql_requirement(_f): """We want to skip tests if using pyevent, MySQLdb is not installed, or if there is no database running on the localhost that the auth file grants us access to. This errs on the side of skipping tests if everything is not right, but it's better than a million tests failing when you don't care about mysql support.""" if using_pyevent(_f): return False if MySQLdb is False: print("Skipping mysql tests, MySQLdb not importable") return False try: auth = get_database_auth()['MySQLdb'].copy() MySQLdb.connect(**auth) return True except MySQLdb.OperationalError: print("Skipping mysql tests, error when connecting:") traceback.print_exc() return False class TestMySQLdb(LimitedTestCase): def setUp(self): self._auth = get_database_auth()['MySQLdb'] self.create_db() self.connection = None self.connection = MySQLdb.connect(**self._auth) cursor = self.connection.cursor() cursor.execute("""CREATE TABLE gargleblatz ( a INTEGER );""") self.connection.commit() cursor.close() super(TestMySQLdb, self).setUp() def tearDown(self): if self.connection: self.connection.close() self.drop_db() super(TestMySQLdb, self).tearDown() @skip_unless(mysql_requirement) def create_db(self): auth = self._auth.copy() try: self.drop_db() except Exception: pass dbname = 'test_%d_%d' % (os.getpid(), int(time.time() * 1000)) db = MySQLdb.connect(**auth).cursor() db.execute("create database " + dbname) db.close() self._auth['db'] = dbname del db def drop_db(self): db = MySQLdb.connect(**self._auth).cursor() db.execute("drop database " + self._auth['db']) db.close() del db def set_up_dummy_table(self, connection=None): close_connection = False if connection is None: close_connection = True if self.connection is None: connection = MySQLdb.connect(**self._auth) else: connection = self.connection cursor = connection.cursor() cursor.execute(self.dummy_table_sql) connection.commit() cursor.close() if close_connection: connection.close() dummy_table_sql = """CREATE TEMPORARY TABLE test_table ( row_id INTEGER PRIMARY KEY AUTO_INCREMENT, value_int INTEGER, value_float FLOAT, value_string VARCHAR(200), value_uuid CHAR(36), value_binary BLOB, value_binary_string VARCHAR(200) BINARY, value_enum ENUM('Y','N'), created TIMESTAMP ) ENGINE=InnoDB;""" def assert_cursor_yields(self, curs): counter = [0] def tick(): while True: counter[0] += 1 eventlet.sleep() gt = eventlet.spawn(tick) curs.execute("select 1") rows = curs.fetchall() self.assertEqual(len(rows), 1) self.assertEqual(len(rows[0]), 1) self.assertEqual(rows[0][0], 1) assert counter[0] > 0, counter[0] gt.kill() def assert_cursor_works(self, cursor): cursor.execute("select 1") rows = cursor.fetchall() self.assertEqual(len(rows), 1) self.assertEqual(len(rows[0]), 1) self.assertEqual(rows[0][0], 1) self.assert_cursor_yields(cursor) def assert_connection_works(self, conn): curs = conn.cursor() self.assert_cursor_works(curs) def test_module_attributes(self): import MySQLdb as orig for key in dir(orig): if key not in ('__author__', '__path__', '__revision__', '__version__', '__loader__'): assert hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key)) def test_connecting(self): assert self.connection is not None def test_connecting_annoyingly(self): self.assert_connection_works(MySQLdb.Connect(**self._auth)) self.assert_connection_works(MySQLdb.Connection(**self._auth)) self.assert_connection_works(MySQLdb.connections.Connection(**self._auth)) def test_create_cursor(self): cursor = self.connection.cursor() cursor.close() def test_run_query(self): cursor = self.connection.cursor() self.assert_cursor_works(cursor) cursor.close() def test_run_bad_query(self): cursor = self.connection.cursor() try: cursor.execute("garbage blah blah") assert False except AssertionError: raise except Exception: pass cursor.close() def fill_up_table(self, conn): curs = conn.cursor() for i in range(1000): curs.execute('insert into test_table (value_int) values (%s)' % i) conn.commit() def test_yields(self): conn = self.connection self.set_up_dummy_table(conn) self.fill_up_table(conn) curs = conn.cursor() results = [] SHORT_QUERY = "select * from test_table" evt = event.Event() def a_query(): self.assert_cursor_works(curs) curs.execute(SHORT_QUERY) results.append(2) evt.send() eventlet.spawn(a_query) results.append(1) self.assertEqual([1], results) evt.wait() self.assertEqual([1, 2], results) def test_visibility_from_other_connections(self): conn = MySQLdb.connect(**self._auth) conn2 = MySQLdb.connect(**self._auth) curs = conn.cursor() try: curs2 = conn2.cursor() curs2.execute("insert into gargleblatz (a) values (%s)" % (314159)) self.assertEqual(curs2.rowcount, 1) conn2.commit() selection_query = "select * from gargleblatz" curs2.execute(selection_query) self.assertEqual(curs2.rowcount, 1) del curs2, conn2 # create a new connection, it should see the addition conn3 = MySQLdb.connect(**self._auth) curs3 = conn3.cursor() curs3.execute(selection_query) self.assertEqual(curs3.rowcount, 1) # now, does the already-open connection see it? curs.execute(selection_query) self.assertEqual(curs.rowcount, 1) del curs3, conn3 finally: # clean up my litter curs.execute("delete from gargleblatz where a=314159") conn.commit() class TestMonkeyPatch(LimitedTestCase): @skip_unless(mysql_requirement) def test_monkey_patching(self): testcode_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'mysqldb_test_monkey_patch.py', ) output = run_python(testcode_path) lines = output.splitlines() self.assertEqual(len(lines), 2, output) self.assertEqual(lines[0].replace("psycopg,", ""), 'mysqltest MySQLdb,os,select,socket,thread,time') self.assertEqual(lines[1], "connect True")