def _router_interface_action(self, action, router_id, subnet_id, port_id,
expected_code=exc.HTTPOk.code,
expected_body=None,
- tenant_id=None):
+ tenant_id=None,
+ msg=None):
interface_data = {}
if subnet_id:
interface_data.update({'subnet_id': subnet_id})
req.environ['neutron.context'] = context.Context(
'', tenant_id)
res = req.get_response(self.ext_api)
- self.assertEqual(res.status_int, expected_code)
+ self.assertEqual(res.status_int, expected_code, msg)
response = self.deserialize(self.fmt, res)
if expected_body:
- self.assertEqual(response, expected_body)
+ self.assertEqual(response, expected_body, msg)
return response
@contextlib.contextmanager
fip['floatingip']['router_id'], None,
expected_code=exc.HTTPConflict.code)
- def test_router_add_interface_subnet(self):
+ def _test_router_add_interface_subnet(self, router, subnet, msg=None):
exp_notifications = ['router.create.start',
'router.create.end',
'network.create.start',
'subnet.create.end',
'router.interface.create',
'router.interface.delete']
- fake_notifier.reset()
- with self.router() as r:
- with self.subnet() as s:
- body = self._router_interface_action('add',
- r['router']['id'],
- s['subnet']['id'],
- None)
- self.assertIn('port_id', body)
+ body = self._router_interface_action('add',
+ router['router']['id'],
+ subnet['subnet']['id'],
+ None)
+ self.assertIn('port_id', body, msg)
+
+ # fetch port and confirm device_id
+ r_port_id = body['port_id']
+ port = self._show('ports', r_port_id)
+ self.assertEqual(port['port']['device_id'],
+ router['router']['id'], msg)
+
+ self._router_interface_action('remove',
+ router['router']['id'],
+ subnet['subnet']['id'],
+ None)
+ self._show('ports', r_port_id,
+ expected_code=exc.HTTPNotFound.code)
- # fetch port and confirm device_id
- r_port_id = body['port_id']
- body = self._show('ports', r_port_id)
- self.assertEqual(body['port']['device_id'], r['router']['id'])
+ self.assertEqual(
+ set(exp_notifications),
+ set(n['event_type'] for n in fake_notifier.NOTIFICATIONS), msg)
+
+ for n in fake_notifier.NOTIFICATIONS:
+ if n['event_type'].startswith('router.interface.'):
+ payload = n['payload']['router_interface']
+ self.assertIn('id', payload)
+ self.assertEqual(payload['id'], router['router']['id'])
+ self.assertIn('tenant_id', payload)
+ stid = subnet['subnet']['tenant_id']
+ # tolerate subnet tenant deliberately set to '' in the
+ # nsx metadata access case
+ self.assertIn(payload['tenant_id'], [stid, ''], msg)
- body = self._router_interface_action('remove',
- r['router']['id'],
- s['subnet']['id'],
- None)
- body = self._show('ports', r_port_id,
- expected_code=exc.HTTPNotFound.code)
-
- self.assertEqual(
- set(exp_notifications),
- set(n['event_type'] for n in fake_notifier.NOTIFICATIONS))
-
- for n in fake_notifier.NOTIFICATIONS:
- if n['event_type'].startswith('router.interface.'):
- payload = n['payload']['router_interface']
- self.assertIn('id', payload)
- self.assertEqual(payload['id'], r['router']['id'])
- self.assertIn('tenant_id', payload)
- stid = s['subnet']['tenant_id']
- # tolerate subnet tenant deliberately to '' in the
- # nsx metadata access case
- self.assertIn(payload['tenant_id'], [stid, ''])
+ def test_router_add_interface_subnet(self):
+ fake_notifier.reset()
+ with self.router() as r:
+ with self.network() as n:
+ with self.subnet(network=n) as s:
+ self._test_router_add_interface_subnet(r, s)
+
+ def test_router_add_interface_ipv6_subnet(self):
+ """Test router-interface-add for valid ipv6 subnets.
+
+ Verify the valid use-cases of an IPv6 subnet where we
+ are allowed to associate to the Neutron Router are successful.
+ """
+ slaac = l3_constants.IPV6_SLAAC
+ stateful = l3_constants.DHCPV6_STATEFUL
+ stateless = l3_constants.DHCPV6_STATELESS
+ use_cases = [{'msg': 'IPv6 Subnet Modes (slaac, none)',
+ 'ra_mode': slaac, 'address_mode': None},
+ {'msg': 'IPv6 Subnet Modes (none, none)',
+ 'ra_mode': None, 'address_mode': None},
+ {'msg': 'IPv6 Subnet Modes (dhcpv6-stateful, none)',
+ 'ra_mode': stateful, 'address_mode': None},
+ {'msg': 'IPv6 Subnet Modes (dhcpv6-stateless, none)',
+ 'ra_mode': stateless, 'address_mode': None},
+ {'msg': 'IPv6 Subnet Modes (slaac, slaac)',
+ 'ra_mode': slaac, 'address_mode': slaac},
+ {'msg': 'IPv6 Subnet Modes (dhcpv6-stateful,'
+ 'dhcpv6-stateful)', 'ra_mode': stateful,
+ 'address_mode': stateful},
+ {'msg': 'IPv6 Subnet Modes (dhcpv6-stateless,'
+ 'dhcpv6-stateless)', 'ra_mode': stateless,
+ 'address_mode': stateless}]
+ for uc in use_cases:
+ fake_notifier.reset()
+ with contextlib.nested(self.router(), self.network()) as (r, n):
+ with self.subnet(network=n, cidr='fd00::1/64',
+ gateway_ip='fd00::1', ip_version=6,
+ ipv6_ra_mode=uc['ra_mode'],
+ ipv6_address_mode=uc['address_mode']) as s:
+ self._test_router_add_interface_subnet(r, s, uc['msg'])
+ self._delete('subnets', s['subnet']['id'])
+
+ def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self):
+ """Test router-interface-add for in-valid ipv6 subnets.
+
+ Verify that an appropriate error message is displayed when
+ an IPv6 subnet configured to use an external_router for Router
+ Advertisements (i.e., ipv6_ra_mode is None and ipv6_address_mode
+ is not None) is attempted to associate with a Neutron Router.
+ """
+ use_cases = [{'msg': 'IPv6 Subnet Modes (none, slaac)',
+ 'ra_mode': None,
+ 'address_mode': l3_constants.IPV6_SLAAC},
+ {'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateful)',
+ 'ra_mode': None,
+ 'address_mode': l3_constants.DHCPV6_STATEFUL},
+ {'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateless)',
+ 'ra_mode': None,
+ 'address_mode': l3_constants.DHCPV6_STATELESS}]
+ for uc in use_cases:
+ with contextlib.nested(self.router(), self.network()) as (r, n):
+ with self.subnet(network=n, cidr='fd00::1/64',
+ gateway_ip='fd00::1', ip_version=6,
+ ipv6_ra_mode=uc['ra_mode'],
+ ipv6_address_mode=uc['address_mode']) as s:
+ exp_code = exc.HTTPBadRequest.code
+ self._router_interface_action('add',
+ r['router']['id'],
+ s['subnet']['id'],
+ None,
+ expected_code=exp_code,
+ msg=uc['msg'])
+ self._delete('subnets', s['subnet']['id'])
def test_router_add_interface_ipv6_subnet_without_gateway_ip(self):
with self.router() as r: