import traceback
from unittest import TestCase, main
-from tests import mock, skipped, skip_unless, skip_with_pyevent, get_database_auth
+from tests import mock, skip_unless, skip_with_pyevent, get_database_auth
from eventlet import event
from eventlet import db_pool
from eventlet.support import six
assert self.pool.free() == 1
self.assertRaises(AttributeError, self.connection.cursor)
- @skipped
- def test_deletion_does_a_put(self):
- # doing a put on del causes some issues if __del__ is called in the
- # main coroutine, so, not doing that for now
- assert self.pool.free() == 0
- self.connection = None
- assert self.pool.free() == 1
-
def test_put_doesnt_double_wrap(self):
self.pool.put(self.connection)
conn = self.pool.get()
conn.commit()
self.pool.put(conn)
- @skipped
- def test_two_simultaneous_connections(self):
- # timing-sensitive test, disabled until we come up with a better
- # way to do this
- self.pool = self.create_pool(max_size=2)
- conn = self.pool.get()
- self.set_up_dummy_table(conn)
- self.fill_up_table(conn)
- curs = conn.cursor()
- conn2 = self.pool.get()
- self.set_up_dummy_table(conn2)
- self.fill_up_table(conn2)
- curs2 = conn2.cursor()
- results = []
- LONG_QUERY = "select * from test_table"
- SHORT_QUERY = "select * from test_table where row_id <= 20"
-
- evt = event.Event()
-
- def long_running_query():
- self.assert_cursor_works(curs)
- curs.execute(LONG_QUERY)
- results.append(1)
- evt.send()
- evt2 = event.Event()
-
- def short_running_query():
- self.assert_cursor_works(curs2)
- curs2.execute(SHORT_QUERY)
- results.append(2)
- evt2.send()
-
- eventlet.spawn(long_running_query)
- eventlet.spawn(short_running_query)
- evt.wait()
- evt2.wait()
- results.sort()
- self.assertEqual([1, 2], results)
-
def test_clear(self):
self.pool = self.create_pool()
self.pool.put(self.connection)
self.connection.close()
self.assertEqual(len(self.pool.free_items), 0)
- @skipped
- def test_max_idle(self):
- # This test is timing-sensitive. Rename the function without
- # the "dont" to run it, but beware that it could fail or take
- # a while.
-
- self.pool = self.create_pool(max_size=2, max_idle=0.02)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.01) # not long enough to trigger the idle timeout
- self.assertEqual(len(self.pool.free_items), 1)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.01) # idle timeout should have fired but done nothing
- self.assertEqual(len(self.pool.free_items), 1)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.03) # long enough to trigger idle timeout for real
- self.assertEqual(len(self.pool.free_items), 0)
-
- @skipped
- def test_max_idle_many(self):
- # This test is timing-sensitive. Rename the function without
- # the "dont" to run it, but beware that it could fail or take
- # a while.
-
- self.pool = self.create_pool(max_size=2, max_idle=0.02)
- self.connection, conn2 = self.pool.get(), self.pool.get()
- self.connection.close()
- eventlet.sleep(0.01)
- self.assertEqual(len(self.pool.free_items), 1)
- conn2.close()
- self.assertEqual(len(self.pool.free_items), 2)
- eventlet.sleep(0.02) # trigger cleanup of conn1 but not conn2
- self.assertEqual(len(self.pool.free_items), 1)
-
- @skipped
- def test_max_age(self):
- # This test is timing-sensitive. Rename the function without
- # the "dont" to run it, but beware that it could fail or take
- # a while.
-
- self.pool = self.create_pool(max_size=2, max_age=0.05)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.01) # not long enough to trigger the age timeout
- self.assertEqual(len(self.pool.free_items), 1)
- self.connection = self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.05) # long enough to trigger age timeout
- self.assertEqual(len(self.pool.free_items), 0)
-
- @skipped
- def test_max_age_many(self):
- # This test is timing-sensitive. Rename the function without
- # the "dont" to run it, but beware that it could fail or take
- # a while.
-
- self.pool = self.create_pool(max_size=2, max_age=0.15)
- self.connection, conn2 = self.pool.get(), self.pool.get()
- self.connection.close()
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0) # not long enough to trigger the age timeout
- self.assertEqual(len(self.pool.free_items), 1)
- eventlet.sleep(0.2) # long enough to trigger age timeout
- self.assertEqual(len(self.pool.free_items), 0)
- conn2.close() # should not be added to the free items
- self.assertEqual(len(self.pool.free_items), 0)
-
def test_waiters_get_woken(self):
# verify that when there's someone waiting on an empty pool
# and someone puts an immediately-closed connection back in
self.assertEqual(self.pool.waiting(), 0)
self.pool.put(conn)
- @skipped
- def test_0_straight_benchmark(self):
- """ Benchmark; don't run unless you want to wait a while."""
- import time
- iterations = 20000
- c = self.connection.cursor()
- self.connection.commit()
-
- def bench(c):
- for i in six.moves.range(iterations):
- c.execute('select 1')
-
- bench(c) # warm-up
- results = []
- for i in range(3):
- start = time.time()
- bench(c)
- end = time.time()
- results.append(end - start)
-
- print("\n%u iterations took an average of %f seconds, (%s) in %s\n" % (
- iterations, sum(results) / len(results), results, type(self)))
-
def test_raising_create(self):
# if the create() method raises an exception the pool should
# not lose any connections