Add python-eventlet package to MOS 9.0 repository
[packages/trusty/python-eventlet.git] / python-eventlet / tests / db_pool_test.py
index 9fc9ebc2005232b80486d368c3be116341cd6eb7..da72be44057817533616719d94df39a3d1bed759 100644 (file)
@@ -7,7 +7,7 @@ import os
 import traceback
 from unittest import TestCase, main
 
-from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth
+from tests import mock, skip_unless, skip_with_pyevent, get_database_auth
 from eventlet import event
 from eventlet import db_pool
 from eventlet.support import six
@@ -116,14 +116,6 @@ class DBConnectionPool(DBTester):
         assert self.pool.free() == 1
         self.assertRaises(AttributeError, self.connection.cursor)
 
-    @skipped
-    def test_deletion_does_a_put(self):
-        # doing a put on del causes some issues if __del__ is called in the
-        # main coroutine, so, not doing that for now
-        assert self.pool.free() == 0
-        self.connection = None
-        assert self.pool.free() == 1
-
     def test_put_doesnt_double_wrap(self):
         self.pool.put(self.connection)
         conn = self.pool.get()
@@ -213,45 +205,6 @@ class DBConnectionPool(DBTester):
             conn.commit()
             self.pool.put(conn)
 
-    @skipped
-    def test_two_simultaneous_connections(self):
-        # timing-sensitive test, disabled until we come up with a better
-        # way to do this
-        self.pool = self.create_pool(max_size=2)
-        conn = self.pool.get()
-        self.set_up_dummy_table(conn)
-        self.fill_up_table(conn)
-        curs = conn.cursor()
-        conn2 = self.pool.get()
-        self.set_up_dummy_table(conn2)
-        self.fill_up_table(conn2)
-        curs2 = conn2.cursor()
-        results = []
-        LONG_QUERY = "select * from test_table"
-        SHORT_QUERY = "select * from test_table where row_id <= 20"
-
-        evt = event.Event()
-
-        def long_running_query():
-            self.assert_cursor_works(curs)
-            curs.execute(LONG_QUERY)
-            results.append(1)
-            evt.send()
-        evt2 = event.Event()
-
-        def short_running_query():
-            self.assert_cursor_works(curs2)
-            curs2.execute(SHORT_QUERY)
-            results.append(2)
-            evt2.send()
-
-        eventlet.spawn(long_running_query)
-        eventlet.spawn(short_running_query)
-        evt.wait()
-        evt2.wait()
-        results.sort()
-        self.assertEqual([1, 2], results)
-
     def test_clear(self):
         self.pool = self.create_pool()
         self.pool.put(self.connection)
@@ -318,80 +271,6 @@ class DBConnectionPool(DBTester):
         self.connection.close()
         self.assertEqual(len(self.pool.free_items), 0)
 
-    @skipped
-    def test_max_idle(self):
-        # This test is timing-sensitive.  Rename the function without
-        # the "dont" to run it, but beware that it could fail or take
-        # a while.
-
-        self.pool = self.create_pool(max_size=2, max_idle=0.02)
-        self.connection = self.pool.get()
-        self.connection.close()
-        self.assertEqual(len(self.pool.free_items), 1)
-        eventlet.sleep(0.01)  # not long enough to trigger the idle timeout
-        self.assertEqual(len(self.pool.free_items), 1)
-        self.connection = self.pool.get()
-        self.connection.close()
-        self.assertEqual(len(self.pool.free_items), 1)
-        eventlet.sleep(0.01)  # idle timeout should have fired but done nothing
-        self.assertEqual(len(self.pool.free_items), 1)
-        self.connection = self.pool.get()
-        self.connection.close()
-        self.assertEqual(len(self.pool.free_items), 1)
-        eventlet.sleep(0.03)  # long enough to trigger idle timeout for real
-        self.assertEqual(len(self.pool.free_items), 0)
-
-    @skipped
-    def test_max_idle_many(self):
-        # This test is timing-sensitive.  Rename the function without
-        # the "dont" to run it, but beware that it could fail or take
-        # a while.
-
-        self.pool = self.create_pool(max_size=2, max_idle=0.02)
-        self.connection, conn2 = self.pool.get(), self.pool.get()
-        self.connection.close()
-        eventlet.sleep(0.01)
-        self.assertEqual(len(self.pool.free_items), 1)
-        conn2.close()
-        self.assertEqual(len(self.pool.free_items), 2)
-        eventlet.sleep(0.02)  # trigger cleanup of conn1 but not conn2
-        self.assertEqual(len(self.pool.free_items), 1)
-
-    @skipped
-    def test_max_age(self):
-        # This test is timing-sensitive.  Rename the function without
-        # the "dont" to run it, but beware that it could fail or take
-        # a while.
-
-        self.pool = self.create_pool(max_size=2, max_age=0.05)
-        self.connection = self.pool.get()
-        self.connection.close()
-        self.assertEqual(len(self.pool.free_items), 1)
-        eventlet.sleep(0.01)  # not long enough to trigger the age timeout
-        self.assertEqual(len(self.pool.free_items), 1)
-        self.connection = self.pool.get()
-        self.connection.close()
-        self.assertEqual(len(self.pool.free_items), 1)
-        eventlet.sleep(0.05)  # long enough to trigger age timeout
-        self.assertEqual(len(self.pool.free_items), 0)
-
-    @skipped
-    def test_max_age_many(self):
-        # This test is timing-sensitive.  Rename the function without
-        # the "dont" to run it, but beware that it could fail or take
-        # a while.
-
-        self.pool = self.create_pool(max_size=2, max_age=0.15)
-        self.connection, conn2 = self.pool.get(), self.pool.get()
-        self.connection.close()
-        self.assertEqual(len(self.pool.free_items), 1)
-        eventlet.sleep(0)  # not long enough to trigger the age timeout
-        self.assertEqual(len(self.pool.free_items), 1)
-        eventlet.sleep(0.2)  # long enough to trigger age timeout
-        self.assertEqual(len(self.pool.free_items), 0)
-        conn2.close()  # should not be added to the free items
-        self.assertEqual(len(self.pool.free_items), 0)
-
     def test_waiters_get_woken(self):
         # verify that when there's someone waiting on an empty pool
         # and someone puts an immediately-closed connection back in
@@ -421,29 +300,6 @@ class DBConnectionPool(DBTester):
         self.assertEqual(self.pool.waiting(), 0)
         self.pool.put(conn)
 
-    @skipped
-    def test_0_straight_benchmark(self):
-        """ Benchmark; don't run unless you want to wait a while."""
-        import time
-        iterations = 20000
-        c = self.connection.cursor()
-        self.connection.commit()
-
-        def bench(c):
-            for i in six.moves.range(iterations):
-                c.execute('select 1')
-
-        bench(c)  # warm-up
-        results = []
-        for i in range(3):
-            start = time.time()
-            bench(c)
-            end = time.time()
-            results.append(end - start)
-
-        print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
-            iterations, sum(results) / len(results), results, type(self)))
-
     def test_raising_create(self):
         # if the create() method raises an exception the pool should
         # not lose any connections