ra._generate_radvd_conf(ri.router['id'],
router[l3_constants.INTERFACE_KEY],
mock.Mock())
- asserter = self.assertIn if flag_set else self.assertNotIn
- asserter('AdvOtherConfigFlag on;',
- self.utils_replace_file.call_args[0][1])
+
+ def assertFlag(flag):
+ return (self.assertIn if flag else self.assertNotIn)
+
+ other_flag, managed_flag = flags_set
+ if not skip(other_flag):
+ assertFlag(other_flag)('AdvOtherConfigFlag on;',
+ self.utils_replace_file.call_args[0][1])
+
+ if not skip(managed_flag):
+ assertFlag(managed_flag)('AdvManagedFlag on',
+ self.utils_replace_file.call_args[0][1])
+ def test__put_fips_in_error_state(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ ri = mock.Mock()
+ ri.router.get.return_value = [{'id': mock.sentinel.id1},
+ {'id': mock.sentinel.id2}]
-class TestL3AgentEventHandler(base.BaseTestCase):
-
- def setUp(self):
- super(TestL3AgentEventHandler, self).setUp()
- cfg.CONF.register_opts(l3_agent.L3NATAgent.OPTS)
- cfg.CONF.register_opts(ha.OPTS)
- agent_config.register_interface_driver_opts_helper(cfg.CONF)
- agent_config.register_use_namespaces_opts_helper(cfg.CONF)
- cfg.CONF.set_override(
- 'interface_driver', 'neutron.agent.linux.interface.NullDriver'
- )
- cfg.CONF.set_override('use_namespaces', True)
- cfg.CONF.set_override('verbose', False)
- agent_config.register_root_helper(cfg.CONF)
-
- device_exists_p = mock.patch(
- 'neutron.agent.linux.ip_lib.device_exists')
- device_exists_p.start()
+ statuses = agent._put_fips_in_error_state(ri)
- utils_exec_p = mock.patch(
- 'neutron.agent.linux.utils.execute')
- utils_exec_p.start()
+ expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR,
+ mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}]
+ self.assertNotEqual(expected, statuses)
- drv_cls_p = mock.patch('neutron.agent.linux.interface.NullDriver')
- driver_cls = drv_cls_p.start()
- mock_driver = mock.MagicMock()
- mock_driver.DEV_NAME_LEN = (
- interface.LinuxInterfaceDriver.DEV_NAME_LEN)
- driver_cls.return_value = mock_driver
+ def test__process_snat_dnat_for_fip(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent.process_router_floating_ip_nat_rules = mock.Mock(
+ side_effect=Exception)
- l3_plugin_p = mock.patch(
- 'neutron.agent.l3.agent.L3PluginApi')
- l3_plugin_cls = l3_plugin_p.start()
- l3_plugin_cls.return_value = mock.MagicMock()
+ self.assertRaises(n_exc.FloatingIpSetupException,
+ agent._process_snat_dnat_for_fip,
+ mock.sentinel.ri)
- self.external_process_p = mock.patch(
- 'neutron.agent.linux.external_process.ProcessManager'
- )
- self.external_process_p.start()
- looping_call_p = mock.patch(
- 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall')
- looping_call_p.start()
- self.agent = l3_agent.L3NATAgent(HOSTNAME)
+ agent.process_router_floating_ip_nat_rules.assert_called_with(
+ mock.sentinel.ri)
- def test_spawn_metadata_proxy(self):
- router_id = _uuid()
- metadata_port = 8080
- ip_class_path = 'neutron.agent.linux.ip_lib.IPWrapper'
+ def test__configure_fip_addresses(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent.process_router_floating_ip_addresses = mock.Mock(
+ side_effect=Exception)
- cfg.CONF.set_override('metadata_port', metadata_port)
- cfg.CONF.set_override('log_file', 'test.log')
- cfg.CONF.set_override('debug', True)
+ self.assertRaises(n_exc.FloatingIpSetupException,
+ agent._configure_fip_addresses,
+ mock.sentinel.ri,
+ mock.sentinel.ex_gw_port)
- self.external_process_p.stop()
- ri = l3router.RouterInfo(router_id, None, None)
- with mock.patch(ip_class_path) as ip_mock:
- self.agent._spawn_metadata_proxy(ri.router_id, ri.ns_name)
- ip_mock.assert_has_calls([
- mock.call('sudo', ri.ns_name),
- mock.call().netns.execute([
- 'neutron-ns-metadata-proxy',
- mock.ANY,
- mock.ANY,
- '--router_id=%s' % router_id,
- mock.ANY,
- '--metadata_port=%s' % metadata_port,
- '--debug',
- '--log-file=neutron-ns-metadata-proxy-%s.log' %
- router_id
- ], addl_env=None)
- ])
+ agent.process_router_floating_ip_addresses.assert_called_with(
+ mock.sentinel.ri,
+ mock.sentinel.ex_gw_port)