BR_PREFIX = 'test-br'
ICMP_BLOCK_RULE = '-p icmp -j DROP'
+VETH_PREFIX = 'tst-vth'
+
+
+#TODO(jschwarz): Move these two functions to neutron/tests/common/
+def get_rand_name(max_length=None, prefix='test'):
+ name = prefix + str(random.randint(1, 0x7fffffff))
+ return name[:max_length] if max_length is not None else name
+
+
+def get_rand_veth_name():
+ return get_rand_name(max_length=n_const.DEVICE_NAME_MAX_LEN,
+ prefix=VETH_PREFIX)
class BaseLinuxTestCase(functional_base.BaseSudoTestCase):
self.skipTest(skip_msg)
raise
- def get_rand_name(self, max_length=None, prefix='test'):
- name = prefix + str(random.randint(1, 0x7fffffff))
- return name[:max_length] if max_length is not None else name
-
def create_resource(self, name_prefix, creation_func, *args, **kwargs):
"""Create a new resource that does not already exist.
:param *args *kwargs: These will be passed to the create function.
"""
while True:
- name = self.get_rand_name(max_length=n_const.DEVICE_NAME_MAX_LEN,
- prefix=name_prefix)
+ name = get_rand_name(max_length=n_const.DEVICE_NAME_MAX_LEN,
+ prefix=name_prefix)
try:
return creation_func(name, *args, **kwargs)
except RuntimeError:
continue
+ def create_veth(self):
+ ip_wrapper = ip_lib.IPWrapper(self.root_helper)
+ name1 = get_rand_veth_name()
+ name2 = get_rand_veth_name()
+ self.addCleanup(ip_wrapper.del_veth, name1)
+ veth1, veth2 = ip_wrapper.add_veth(name1, name2)
+ return veth1, veth2
+
class BaseOVSLinuxTestCase(BaseLinuxTestCase):
def setUp(self):
self.addCleanup(br.destroy)
return br
+ def get_ovs_bridge(self, br_name):
+ return ovs_lib.OVSBridge(br_name, self.root_helper)
+
class BaseIPVethTestCase(BaseLinuxTestCase):
SRC_ADDRESS = '192.168.0.1'
DST_ADDRESS = '192.168.0.2'
BROADCAST_ADDRESS = '192.168.0.255'
- SRC_VETH = 'source'
- DST_VETH = 'destination'
def setUp(self):
super(BaseIPVethTestCase, self).setUp()
src_addr = src_addr or self.SRC_ADDRESS
dst_addr = dst_addr or self.DST_ADDRESS
broadcast_addr = broadcast_addr or self.BROADCAST_ADDRESS
- src_veth = src_veth or self.SRC_VETH
- dst_veth = dst_veth or self.DST_VETH
+ src_veth = src_veth or get_rand_veth_name()
+ dst_veth = dst_veth or get_rand_veth_name()
src_ns = src_ns or self._create_namespace()
dst_ns = dst_ns or self._create_namespace()
import copy
+import fixtures
import mock
from oslo.config import cfg
-from neutron.agent.common import config
+from neutron.agent.common import config as agent_config
from neutron.agent import l3_agent
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
+from neutron.common import config as common_config
from neutron.common import constants as l3_constants
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
+from neutron.tests.common.agents import l3_agent as l3_test_agent
from neutron.tests.functional.agent.linux import base
from neutron.tests.unit import test_l3_agent
def setUp(self):
super(L3AgentTestFramework, self).setUp()
self.check_sudo_enabled()
- self._configure()
-
- def _configure(self):
- l3_agent._register_opts(cfg.CONF)
+ mock.patch('neutron.agent.l3_agent.L3PluginApi').start()
+ self.agent = self._configure_agent('agent1')
+
+ def _get_config_opts(self):
+ config = cfg.ConfigOpts()
+ config.register_opts(common_config.core_opts)
+ config.register_opts(common_config.core_cli_opts)
+ config.register_cli_opts(logging.common_cli_opts)
+ config.register_cli_opts(logging.logging_cli_opts)
+ config.register_opts(logging.generic_log_opts)
+ config.register_opts(logging.log_opts)
+ return config
+
+ def _configure_agent(self, host):
+ conf = self._get_config_opts()
+ l3_agent._register_opts(conf)
cfg.CONF.set_override('debug', False)
- config.setup_logging()
- cfg.CONF.set_override(
+ agent_config.setup_logging()
+ conf.set_override(
'interface_driver',
'neutron.agent.linux.interface.OVSInterfaceDriver')
- cfg.CONF.set_override('router_delete_namespaces', True)
- cfg.CONF.set_override('root_helper', self.root_helper, group='AGENT')
- cfg.CONF.set_override('use_namespaces', True)
- cfg.CONF.set_override('enable_metadata_proxy', True)
+ conf.set_override('router_delete_namespaces', True)
+ conf.set_override('root_helper', self.root_helper, group='AGENT')
br_int = self.create_ovs_bridge()
- cfg.CONF.set_override('ovs_integration_bridge', br_int.br_name)
br_ex = self.create_ovs_bridge()
- cfg.CONF.set_override('external_network_bridge', br_ex.br_name)
-
- mock.patch('neutron.agent.l3_agent.L3PluginApi').start()
-
- self.agent = l3_agent.L3NATAgent('localhost', cfg.CONF)
-
- mock.patch.object(self.agent, '_send_gratuitous_arp_packet').start()
-
- def manage_router(self, enable_ha):
- router = test_l3_agent.prepare_router_data(enable_snat=True,
- enable_floating_ip=True,
- enable_ha=enable_ha)
- self.addCleanup(self._delete_router, router['id'])
- ri = self._create_router(router)
+ conf.set_override('ovs_integration_bridge', br_int.br_name)
+ conf.set_override('external_network_bridge', br_ex.br_name)
+
+ temp_dir = self.useFixture(fixtures.TempDir()).path
+ conf.set_override('state_path', temp_dir)
+ conf.set_override('metadata_proxy_socket',
+ '%s/metadata_proxy' % temp_dir)
+ conf.set_override('ha_confs_path',
+ '%s/ha_confs' % temp_dir)
+ conf.set_override('external_pids',
+ '%s/external/pids' % temp_dir)
+ conf.set_override('host', host)
+ agent = l3_test_agent.TestL3NATAgent(host, conf)
+ mock.patch.object(agent, '_arping').start()
+
+ return agent
+
+ def generate_router_info(self, enable_ha):
+ return test_l3_agent.prepare_router_data(enable_snat=True,
+ enable_floating_ip=True,
+ enable_ha=enable_ha)
+
+ def manage_router(self, agent, router):
+ self.addCleanup(self._delete_router, agent, router['id'])
+ ri = self._create_router(agent, router)
return ri
- def _create_router(self, router):
- self.agent._router_added(router['id'], router)
- ri = self.agent.router_info[router['id']]
+ def _create_router(self, agent, router):
+ agent._router_added(router['id'], router)
+ ri = agent.router_info[router['id']]
ri.router = router
- self.agent.process_router(ri)
+ agent.process_router(ri)
return ri
- def _delete_router(self, router_id):
- self.agent._router_removed(router_id)
+ def _delete_router(self, agent, router_id):
+ agent._router_removed(router_id)
def _add_fip(self, router, fip_address, fixed_address='10.0.0.2'):
fip = {'id': _uuid(),
ip = ip_lib.IPWrapper(self.root_helper, router.ns_name)
return ip.netns.exists(router.ns_name)
- def _metadata_proxy_exists(self, router):
+ def _metadata_proxy_exists(self, conf, router):
pm = external_process.ProcessManager(
- cfg.CONF,
+ conf,
router.router_id,
self.root_helper,
router.ns_name)
namespace, self.root_helper)
def get_expected_keepalive_configuration(self, router):
- ha_confs_path = cfg.CONF.ha_confs_path
+ ha_confs_path = self.agent.conf.ha_confs_path
router_id = router.router_id
ha_device_name = self.agent.get_ha_device_name(router.ha_port['id'])
ha_device_cidr = router.ha_port['ip_cidr']
self._router_lifecycle(enable_ha=True)
def test_keepalived_configuration(self):
- router = self.manage_router(enable_ha=True)
+ router_info = self.generate_router_info(enable_ha=True)
+ router = self.manage_router(self.agent, router_info)
expected = self.get_expected_keepalive_configuration(router)
self.assertEqual(expected,
self.assertIn(new_external_device_ip, new_config)
def _router_lifecycle(self, enable_ha):
- router = self.manage_router(enable_ha)
+ router_info = self.generate_router_info(enable_ha)
+ router = self.manage_router(self.agent, router_info)
if enable_ha:
self.wait_until(lambda: router.ha_state == 'master')
router.ns_name)
self.assertTrue(self._namespace_exists(router))
- self.assertTrue(self._metadata_proxy_exists(router))
+ self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
self._assert_internal_devices(router)
self._assert_external_device(router)
self._assert_gateway(router)
self._assert_ha_device(router)
self.assertTrue(router.keepalived_manager.process.active)
- self._delete_router(router.router_id)
+ self._delete_router(self.agent, router.router_id)
self._assert_router_does_not_exist(router)
if enable_ha:
# then the devices and iptable rules have also been deleted,
# so there's no need to check that explicitly.
self.assertFalse(self._namespace_exists(router))
- self.assertFalse(self._metadata_proxy_exists(router))
+ self.assertFalse(self._metadata_proxy_exists(self.agent.conf, router))
def _assert_ha_device(self, router):
self.assertTrue(self.device_exists_with_ip_mac(
router.router[l3_constants.HA_INTERFACE_KEY],
self.agent.get_ha_device_name, router.ns_name))
+
+
+class L3HATestFramework(L3AgentTestFramework):
+ def setUp(self):
+ super(L3HATestFramework, self).setUp()
+ self.failover_agent = self._configure_agent('agent2')
+
+ br_int_1 = self.get_ovs_bridge(
+ self.agent.conf.ovs_integration_bridge)
+ br_int_2 = self.get_ovs_bridge(
+ self.failover_agent.conf.ovs_integration_bridge)
+
+ veth1, veth2 = self.create_veth()
+ br_int_1.add_port(veth1.name)
+ br_int_2.add_port(veth2.name)
+
+ def test_ha_router_failover(self):
+ router_info = self.generate_router_info(enable_ha=True)
+ router1 = self.manage_router(self.agent, router_info)
+
+ router_info_2 = copy.deepcopy(router_info)
+ router_info_2[l3_constants.HA_INTERFACE_KEY] = (
+ test_l3_agent.get_ha_interface(ip='169.254.0.3',
+ mac='22:22:22:22:22:22'))
+
+ router2 = self.manage_router(self.failover_agent, router_info_2)
+
+ self.wait_until(lambda: router1.ha_state == 'master')
+ self.wait_until(lambda: router2.ha_state == 'backup')
+
+ device_name = self.agent.get_ha_device_name(
+ router1.router[l3_constants.HA_INTERFACE_KEY]['id'])
+ ha_device = ip_lib.IPDevice(device_name, self.root_helper,
+ router1.ns_name)
+ ha_device.link.set_down()
+
+ self.wait_until(lambda: router2.ha_state == 'master')
+ self.wait_until(lambda: router1.ha_state == 'fault')