from neutron.agent.linux import utils
from neutron.common import constants as n_const
from neutron.openstack.common import uuidutils
-from neutron.tests.functional.agent.linux import pinger
+from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base as functional_base
def setUp(self):
super(BaseIPVethTestCase, self).setUp()
self.check_sudo_enabled()
- self.pinger = pinger.Pinger(self)
+ self.pinger = helpers.Pinger(self)
@staticmethod
def _set_ip_up(device, cidr, broadcast, ip_version=4):
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import eventlet
+
+
+def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
+ """
+ Wait until callable predicate is evaluated as True
+
+ :param predicate: Callable deciding whether waiting should continue.
+ Best practice is to instantiate predicate with functools.partial()
+ :param timeout: Timeout in seconds how long should function wait.
+ :param sleep: Polling interval for results in seconds.
+ :param exception: Exception class for eventlet.Timeout.
+ (see doc for eventlet.Timeout for more information)
+ """
+ with eventlet.timeout.Timeout(timeout, exception):
+ while not predicate():
+ eventlet.sleep(sleep)
class Pinger(object):
# under the License.
import copy
+import functools
import fixtures
import mock
from neutron.openstack.common import uuidutils
from neutron.tests.common.agents import l3_agent as l3_test_agent
from neutron.tests.functional.agent.linux import base
+from neutron.tests.functional.agent.linux import helpers
from neutron.tests.unit import test_l3_agent
LOG = logging.getLogger(__name__)
router = self.manage_router(self.agent, router_info)
if enable_ha:
- self.wait_until(lambda: router.ha_state == 'master')
+ helpers.wait_until_true(lambda: router.ha_state == 'master')
# Keepalived notifies of a state transition when it starts,
# not when it ends. Thus, we have to wait until keepalived finishes
# configuring everything. We verify this by waiting until the last
# device has an IP address.
device = router.router[l3_constants.INTERFACE_KEY][-1]
- self.wait_until(self.device_exists_with_ip_mac, device,
- self.agent.get_internal_device_name,
- router.ns_name)
+ device_exists = functools.partial(
+ self.device_exists_with_ip_mac,
+ device,
+ self.agent.get_internal_device_name,
+ router.ns_name)
+ helpers.wait_until_true(device_exists)
self.assertTrue(self._namespace_exists(router))
self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
router2 = self.manage_router(self.failover_agent, router_info_2)
- self.wait_until(lambda: router1.ha_state == 'master')
- self.wait_until(lambda: router2.ha_state == 'backup')
+ helpers.wait_until_true(lambda: router1.ha_state == 'master')
+ helpers.wait_until_true(lambda: router2.ha_state == 'backup')
device_name = self.agent.get_ha_device_name(
router1.router[l3_constants.HA_INTERFACE_KEY]['id'])
router1.ns_name)
ha_device.link.set_down()
- self.wait_until(lambda: router2.ha_state == 'master')
- self.wait_until(lambda: router1.ha_state == 'fault')
+ helpers.wait_until_true(lambda: router2.ha_state == 'master')
+ helpers.wait_until_true(lambda: router1.ha_state == 'fault')
# under the License.
import os
-import time
from neutron.tests import base
SUDO_CMD = 'sudo -n'
-TIMEOUT = 60
-SLEEP_INTERVAL = 1
class BaseSudoTestCase(base.BaseTestCase):
def check_sudo_enabled(self):
if not self.sudo_enabled:
self.skipTest('testing with sudo is not enabled')
-
- def wait_until(self, predicate, *args, **kwargs):
- with self.assert_max_execution_time(TIMEOUT):
- while not predicate(*args, **kwargs):
- time.sleep(SLEEP_INTERVAL)