X-Git-Url: https://review.fuel-infra.org/gitweb?a=blobdiff_plain;f=eventlet%2Ftests%2Fsemaphore_test.py;fp=eventlet%2Ftests%2Fsemaphore_test.py;h=0000000000000000000000000000000000000000;hb=358bd9258c2b6d2ee74de4dfd07a5123107abad4;hp=13163302480d06ae6d8ef7e07b3749aa570b2f4d;hpb=376ff3bfe7071cc0793184a378c4e74508fb0d97;p=packages%2Ftrusty%2Fpython-eventlet.git diff --git a/eventlet/tests/semaphore_test.py b/eventlet/tests/semaphore_test.py deleted file mode 100644 index 1316330..0000000 --- a/eventlet/tests/semaphore_test.py +++ /dev/null @@ -1,49 +0,0 @@ -import time -import unittest - -import eventlet -from eventlet import semaphore -from tests import LimitedTestCase - - -class TestSemaphore(LimitedTestCase): - - def test_bounded(self): - sem = semaphore.CappedSemaphore(2, limit=3) - self.assertEqual(sem.acquire(), True) - self.assertEqual(sem.acquire(), True) - gt1 = eventlet.spawn(sem.release) - self.assertEqual(sem.acquire(), True) - self.assertEqual(-3, sem.balance) - sem.release() - sem.release() - sem.release() - gt2 = eventlet.spawn(sem.acquire) - sem.release() - self.assertEqual(3, sem.balance) - gt1.wait() - gt2.wait() - - def test_bounded_with_zero_limit(self): - sem = semaphore.CappedSemaphore(0, 0) - gt = eventlet.spawn(sem.acquire) - sem.release() - gt.wait() - - def test_non_blocking(self): - sem = semaphore.Semaphore(0) - self.assertEqual(sem.acquire(blocking=False), False) - - def test_timeout(self): - sem = semaphore.Semaphore(0) - start = time.time() - self.assertEqual(sem.acquire(timeout=0.1), False) - self.assertTrue(time.time() - start >= 0.1) - - def test_timeout_non_blocking(self): - sem = semaphore.Semaphore() - self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1) - - -if __name__ == '__main__': - unittest.main()