From: sridhargaddam Date: Mon, 24 Nov 2014 10:17:36 +0000 (+0000) Subject: Validate IPv6 subnet while associating to Router X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=e5713f932482210107dcfd7bac6a78524c8bad26;p=openstack-build%2Fneutron-build.git Validate IPv6 subnet while associating to Router Currently Neutron allows attaching a subnet (configured to use an external router, by only setting ipv6_address_mode and leaving ipv6_ra_mode unset) to Neutron Router. Ideally Neutron should not allow this operation and should return an appropriate error message to the user. APIImpact Closes-Bug: #1393527 Change-Id: I9d597e6f5e8aea63222bb9f5ed8289e4ce28bbc3 --- diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index 31f6a6bd9..6a3d1e118 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -514,6 +514,12 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): if not subnet['gateway_ip']: msg = _('Subnet for router interface must have a gateway IP') raise n_exc.BadRequest(resource='router', msg=msg) + if (subnet['ip_version'] == 6 and subnet['ipv6_ra_mode'] is None + and subnet['ipv6_address_mode'] is not None): + msg = (_('IPv6 subnet %s configured to receive RAs from an ' + 'external router cannot be added to Neutron Router.') % + subnet['id']) + raise n_exc.BadRequest(resource='router', msg=msg) self._check_for_dup_router_subnet(context, router, subnet['network_id'], subnet_id, diff --git a/neutron/tests/unit/test_l3_plugin.py b/neutron/tests/unit/test_l3_plugin.py index 190c25e15..24f82fec2 100644 --- a/neutron/tests/unit/test_l3_plugin.py +++ b/neutron/tests/unit/test_l3_plugin.py @@ -382,7 +382,8 @@ class L3NatTestCaseMixin(object): def _router_interface_action(self, action, router_id, subnet_id, port_id, expected_code=exc.HTTPOk.code, expected_body=None, - tenant_id=None): + tenant_id=None, + msg=None): interface_data = {} if subnet_id: interface_data.update({'subnet_id': subnet_id}) @@ -396,10 +397,10 @@ class L3NatTestCaseMixin(object): req.environ['neutron.context'] = context.Context( '', tenant_id) res = req.get_response(self.ext_api) - self.assertEqual(res.status_int, expected_code) + self.assertEqual(res.status_int, expected_code, msg) response = self.deserialize(self.fmt, res) if expected_body: - self.assertEqual(response, expected_body) + self.assertEqual(response, expected_body, msg) return response @contextlib.contextmanager @@ -844,7 +845,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): fip['floatingip']['router_id'], None, expected_code=exc.HTTPConflict.code) - def test_router_add_interface_subnet(self): + def _test_router_add_interface_subnet(self, router, subnet, msg=None): exp_notifications = ['router.create.start', 'router.create.end', 'network.create.start', @@ -853,41 +854,113 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): 'subnet.create.end', 'router.interface.create', 'router.interface.delete'] - fake_notifier.reset() - with self.router() as r: - with self.subnet() as s: - body = self._router_interface_action('add', - r['router']['id'], - s['subnet']['id'], - None) - self.assertIn('port_id', body) + body = self._router_interface_action('add', + router['router']['id'], + subnet['subnet']['id'], + None) + self.assertIn('port_id', body, msg) + + # fetch port and confirm device_id + r_port_id = body['port_id'] + port = self._show('ports', r_port_id) + self.assertEqual(port['port']['device_id'], + router['router']['id'], msg) + + self._router_interface_action('remove', + router['router']['id'], + subnet['subnet']['id'], + None) + self._show('ports', r_port_id, + expected_code=exc.HTTPNotFound.code) - # fetch port and confirm device_id - r_port_id = body['port_id'] - body = self._show('ports', r_port_id) - self.assertEqual(body['port']['device_id'], r['router']['id']) + self.assertEqual( + set(exp_notifications), + set(n['event_type'] for n in fake_notifier.NOTIFICATIONS), msg) + + for n in fake_notifier.NOTIFICATIONS: + if n['event_type'].startswith('router.interface.'): + payload = n['payload']['router_interface'] + self.assertIn('id', payload) + self.assertEqual(payload['id'], router['router']['id']) + self.assertIn('tenant_id', payload) + stid = subnet['subnet']['tenant_id'] + # tolerate subnet tenant deliberately set to '' in the + # nsx metadata access case + self.assertIn(payload['tenant_id'], [stid, ''], msg) - body = self._router_interface_action('remove', - r['router']['id'], - s['subnet']['id'], - None) - body = self._show('ports', r_port_id, - expected_code=exc.HTTPNotFound.code) - - self.assertEqual( - set(exp_notifications), - set(n['event_type'] for n in fake_notifier.NOTIFICATIONS)) - - for n in fake_notifier.NOTIFICATIONS: - if n['event_type'].startswith('router.interface.'): - payload = n['payload']['router_interface'] - self.assertIn('id', payload) - self.assertEqual(payload['id'], r['router']['id']) - self.assertIn('tenant_id', payload) - stid = s['subnet']['tenant_id'] - # tolerate subnet tenant deliberately to '' in the - # nsx metadata access case - self.assertIn(payload['tenant_id'], [stid, '']) + def test_router_add_interface_subnet(self): + fake_notifier.reset() + with self.router() as r: + with self.network() as n: + with self.subnet(network=n) as s: + self._test_router_add_interface_subnet(r, s) + + def test_router_add_interface_ipv6_subnet(self): + """Test router-interface-add for valid ipv6 subnets. + + Verify the valid use-cases of an IPv6 subnet where we + are allowed to associate to the Neutron Router are successful. + """ + slaac = l3_constants.IPV6_SLAAC + stateful = l3_constants.DHCPV6_STATEFUL + stateless = l3_constants.DHCPV6_STATELESS + use_cases = [{'msg': 'IPv6 Subnet Modes (slaac, none)', + 'ra_mode': slaac, 'address_mode': None}, + {'msg': 'IPv6 Subnet Modes (none, none)', + 'ra_mode': None, 'address_mode': None}, + {'msg': 'IPv6 Subnet Modes (dhcpv6-stateful, none)', + 'ra_mode': stateful, 'address_mode': None}, + {'msg': 'IPv6 Subnet Modes (dhcpv6-stateless, none)', + 'ra_mode': stateless, 'address_mode': None}, + {'msg': 'IPv6 Subnet Modes (slaac, slaac)', + 'ra_mode': slaac, 'address_mode': slaac}, + {'msg': 'IPv6 Subnet Modes (dhcpv6-stateful,' + 'dhcpv6-stateful)', 'ra_mode': stateful, + 'address_mode': stateful}, + {'msg': 'IPv6 Subnet Modes (dhcpv6-stateless,' + 'dhcpv6-stateless)', 'ra_mode': stateless, + 'address_mode': stateless}] + for uc in use_cases: + fake_notifier.reset() + with contextlib.nested(self.router(), self.network()) as (r, n): + with self.subnet(network=n, cidr='fd00::1/64', + gateway_ip='fd00::1', ip_version=6, + ipv6_ra_mode=uc['ra_mode'], + ipv6_address_mode=uc['address_mode']) as s: + self._test_router_add_interface_subnet(r, s, uc['msg']) + self._delete('subnets', s['subnet']['id']) + + def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self): + """Test router-interface-add for in-valid ipv6 subnets. + + Verify that an appropriate error message is displayed when + an IPv6 subnet configured to use an external_router for Router + Advertisements (i.e., ipv6_ra_mode is None and ipv6_address_mode + is not None) is attempted to associate with a Neutron Router. + """ + use_cases = [{'msg': 'IPv6 Subnet Modes (none, slaac)', + 'ra_mode': None, + 'address_mode': l3_constants.IPV6_SLAAC}, + {'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateful)', + 'ra_mode': None, + 'address_mode': l3_constants.DHCPV6_STATEFUL}, + {'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateless)', + 'ra_mode': None, + 'address_mode': l3_constants.DHCPV6_STATELESS}] + for uc in use_cases: + with contextlib.nested(self.router(), self.network()) as (r, n): + with self.subnet(network=n, cidr='fd00::1/64', + gateway_ip='fd00::1', ip_version=6, + ipv6_ra_mode=uc['ra_mode'], + ipv6_address_mode=uc['address_mode']) as s: + exp_code = exc.HTTPBadRequest.code + self._router_interface_action('add', + r['router']['id'], + s['subnet']['id'], + None, + expected_code=exp_code, + msg=uc['msg']) + self._delete('subnets', s['subnet']['id']) def test_router_add_interface_ipv6_subnet_without_gateway_ip(self): with self.router() as r: