X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=eventlet%2Ftests%2Fmysqldb_test.py;fp=eventlet%2Ftests%2Fmysqldb_test.py;h=0000000000000000000000000000000000000000;hb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;hp=e5a87d3e739cb79597fe60a1c8255515f69de4c2;hpb=376ff3bfe7071cc0793184a378c4e74508fb0d97;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/tests/mysqldb_test.py b/eventlet/tests/mysqldb_test.py deleted file mode 100644 index e5a87d3..0000000 --- a/eventlet/tests/mysqldb_test.py +++ /dev/null @@ -1,244 +0,0 @@ -from __future__ import print_function - -import os -import time -import traceback - -import eventlet -from eventlet import event -from tests import ( - LimitedTestCase, - run_python, - skip_unless, using_pyevent, get_database_auth, -) -try: - from eventlet.green import MySQLdb -except ImportError: - MySQLdb = False - - -def mysql_requirement(_f): - """We want to skip tests if using pyevent, MySQLdb is not installed, or if - there is no database running on the localhost that the auth file grants - us access to. - - This errs on the side of skipping tests if everything is not right, but - it's better than a million tests failing when you don't care about mysql - support.""" - if using_pyevent(_f): - return False - if MySQLdb is False: - print("Skipping mysql tests, MySQLdb not importable") - return False - try: - auth = get_database_auth()['MySQLdb'].copy() - MySQLdb.connect(**auth) - return True - except MySQLdb.OperationalError: - print("Skipping mysql tests, error when connecting:") - traceback.print_exc() - return False - - -class TestMySQLdb(LimitedTestCase): - def setUp(self): - self._auth = get_database_auth()['MySQLdb'] - self.create_db() - self.connection = None - self.connection = MySQLdb.connect(**self._auth) - cursor = self.connection.cursor() - cursor.execute("""CREATE TABLE gargleblatz - ( - a INTEGER - );""") - self.connection.commit() - cursor.close() - - super(TestMySQLdb, self).setUp() - - def tearDown(self): - if self.connection: - self.connection.close() - self.drop_db() - - super(TestMySQLdb, self).tearDown() - - @skip_unless(mysql_requirement) - def create_db(self): - auth = self._auth.copy() - try: - self.drop_db() - except Exception: - pass - dbname = 'test_%d_%d' % (os.getpid(), int(time.time() * 1000)) - db = MySQLdb.connect(**auth).cursor() - db.execute("create database " + dbname) - db.close() - self._auth['db'] = dbname - del db - - def drop_db(self): - db = MySQLdb.connect(**self._auth).cursor() - db.execute("drop database " + self._auth['db']) - db.close() - del db - - def set_up_dummy_table(self, connection=None): - close_connection = False - if connection is None: - close_connection = True - if self.connection is None: - connection = MySQLdb.connect(**self._auth) - else: - connection = self.connection - - cursor = connection.cursor() - cursor.execute(self.dummy_table_sql) - connection.commit() - cursor.close() - if close_connection: - connection.close() - - dummy_table_sql = """CREATE TEMPORARY TABLE test_table - ( - row_id INTEGER PRIMARY KEY AUTO_INCREMENT, - value_int INTEGER, - value_float FLOAT, - value_string VARCHAR(200), - value_uuid CHAR(36), - value_binary BLOB, - value_binary_string VARCHAR(200) BINARY, - value_enum ENUM('Y','N'), - created TIMESTAMP - ) ENGINE=InnoDB;""" - - def assert_cursor_yields(self, curs): - counter = [0] - - def tick(): - while True: - counter[0] += 1 - eventlet.sleep() - gt = eventlet.spawn(tick) - curs.execute("select 1") - rows = curs.fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(len(rows[0]), 1) - self.assertEqual(rows[0][0], 1) - assert counter[0] > 0, counter[0] - gt.kill() - - def assert_cursor_works(self, cursor): - cursor.execute("select 1") - rows = cursor.fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(len(rows[0]), 1) - self.assertEqual(rows[0][0], 1) - self.assert_cursor_yields(cursor) - - def assert_connection_works(self, conn): - curs = conn.cursor() - self.assert_cursor_works(curs) - - def test_module_attributes(self): - import MySQLdb as orig - for key in dir(orig): - if key not in ('__author__', '__path__', '__revision__', - '__version__', '__loader__'): - assert hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key)) - - def test_connecting(self): - assert self.connection is not None - - def test_connecting_annoyingly(self): - self.assert_connection_works(MySQLdb.Connect(**self._auth)) - self.assert_connection_works(MySQLdb.Connection(**self._auth)) - self.assert_connection_works(MySQLdb.connections.Connection(**self._auth)) - - def test_create_cursor(self): - cursor = self.connection.cursor() - cursor.close() - - def test_run_query(self): - cursor = self.connection.cursor() - self.assert_cursor_works(cursor) - cursor.close() - - def test_run_bad_query(self): - cursor = self.connection.cursor() - try: - cursor.execute("garbage blah blah") - assert False - except AssertionError: - raise - except Exception: - pass - cursor.close() - - def fill_up_table(self, conn): - curs = conn.cursor() - for i in range(1000): - curs.execute('insert into test_table (value_int) values (%s)' % i) - conn.commit() - - def test_yields(self): - conn = self.connection - self.set_up_dummy_table(conn) - self.fill_up_table(conn) - curs = conn.cursor() - results = [] - SHORT_QUERY = "select * from test_table" - evt = event.Event() - - def a_query(): - self.assert_cursor_works(curs) - curs.execute(SHORT_QUERY) - results.append(2) - evt.send() - eventlet.spawn(a_query) - results.append(1) - self.assertEqual([1], results) - evt.wait() - self.assertEqual([1, 2], results) - - def test_visibility_from_other_connections(self): - conn = MySQLdb.connect(**self._auth) - conn2 = MySQLdb.connect(**self._auth) - curs = conn.cursor() - try: - curs2 = conn2.cursor() - curs2.execute("insert into gargleblatz (a) values (%s)" % (314159)) - self.assertEqual(curs2.rowcount, 1) - conn2.commit() - selection_query = "select * from gargleblatz" - curs2.execute(selection_query) - self.assertEqual(curs2.rowcount, 1) - del curs2, conn2 - # create a new connection, it should see the addition - conn3 = MySQLdb.connect(**self._auth) - curs3 = conn3.cursor() - curs3.execute(selection_query) - self.assertEqual(curs3.rowcount, 1) - # now, does the already-open connection see it? - curs.execute(selection_query) - self.assertEqual(curs.rowcount, 1) - del curs3, conn3 - finally: - # clean up my litter - curs.execute("delete from gargleblatz where a=314159") - conn.commit() - - -class TestMonkeyPatch(LimitedTestCase): - @skip_unless(mysql_requirement) - def test_monkey_patching(self): - testcode_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - 'mysqldb_test_monkey_patch.py', - ) - output = run_python(testcode_path) - lines = output.splitlines() - self.assertEqual(len(lines), 2, output) - self.assertEqual(lines[0].replace("psycopg,", ""), - 'mysqltest MySQLdb,os,select,socket,thread,time') - self.assertEqual(lines[1], "connect True")