"add_router_interface": "rule:admin_or_owner",
"remove_router_interface": "rule:admin_or_owner",
+ "create_router:external_gateway_info:external_fixed_ips": "rule:admin_only",
+ "update_router:external_gateway_info:external_fixed_ips": "rule:admin_only",
+
"create_firewall": "",
"get_firewall": "rule:admin_or_owner",
"create_firewall:shared": "rule:admin_only",
return candidates
- def _create_router_gw_port(self, context, router, network_id):
+ def _create_router_gw_port(self, context, router, network_id, ext_ips):
+ if ext_ips and len(ext_ips) > 1:
+ msg = _("Routers support only 1 external IP")
+ raise n_exc.BadRequest(resource='router', msg=msg)
# Port has no 'tenant-id', as it is hidden from user
gw_port = self._core_plugin.create_port(context.elevated(), {
'port': {'tenant_id': '', # intentionally not set
'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
- 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
+ 'fixed_ips': ext_ips or attributes.ATTR_NOT_SPECIFIED,
'device_id': router['id'],
'device_owner': DEVICE_OWNER_ROUTER_GW,
'admin_state_up': True,
context.session.add(router)
context.session.add(router_port)
- def _validate_gw_info(self, context, gw_port, info):
+ def _validate_gw_info(self, context, gw_port, info, ext_ips):
network_id = info['network_id'] if info else None
if network_id:
network_db = self._core_plugin._get_network(context, network_id)
if not network_db.external:
msg = _("Network %s is not an external network") % network_id
raise n_exc.BadRequest(resource='router', msg=msg)
+ if ext_ips:
+ subnets = self._core_plugin._get_subnets_by_network(context,
+ network_id)
+ for s in subnets:
+ if not s['gateway_ip']:
+ continue
+ for ext_ip in ext_ips:
+ if ext_ip.get('ip_address') == s['gateway_ip']:
+ msg = _("External IP %s is the same as the "
+ "gateway IP") % ext_ip.get('ip_address')
+ raise n_exc.BadRequest(resource='router', msg=msg)
return network_id
- def _delete_current_gw_port(self, context, router_id, router, new_network):
- """Delete gw port, if it is attached to an old network."""
- is_gw_port_attached_to_existing_network = (
- router.gw_port and router.gw_port['network_id'] != new_network)
+ def _delete_current_gw_port(self, context, router_id, router, new_network,
+ ext_ip_change):
+ """Delete gw port if attached to an old network or IPs changed."""
+ port_requires_deletion = (
+ router.gw_port and
+ (router.gw_port['network_id'] != new_network or ext_ip_change)
+ )
+ if not port_requires_deletion:
+ return
admin_ctx = context.elevated()
- if is_gw_port_attached_to_existing_network:
- if self.get_floatingips_count(
- admin_ctx, {'router_id': [router_id]}):
- raise l3.RouterExternalGatewayInUseByFloatingIp(
- router_id=router_id, net_id=router.gw_port['network_id'])
- with context.session.begin(subtransactions=True):
- gw_port = router.gw_port
- router.gw_port = None
- context.session.add(router)
- context.session.expire(gw_port)
- vpnservice = manager.NeutronManager.get_service_plugins().get(
- constants.VPN)
- if vpnservice:
- vpnservice.check_router_in_use(context, router_id)
- self._core_plugin.delete_port(
- admin_ctx, gw_port['id'], l3_port_check=False)
-
- def _create_gw_port(self, context, router_id, router, new_network):
+
+ if self.get_floatingips_count(
+ admin_ctx, {'router_id': [router_id]}):
+ raise l3.RouterExternalGatewayInUseByFloatingIp(
+ router_id=router_id, net_id=router.gw_port['network_id'])
+ with context.session.begin(subtransactions=True):
+ gw_port = router.gw_port
+ router.gw_port = None
+ context.session.add(router)
+ context.session.expire(gw_port)
+ vpnservice = manager.NeutronManager.get_service_plugins().get(
+ constants.VPN)
+ if vpnservice:
+ vpnservice.check_router_in_use(context, router_id)
+ self._core_plugin.delete_port(
+ admin_ctx, gw_port['id'], l3_port_check=False)
+
+ def _create_gw_port(self, context, router_id, router, new_network,
+ ext_ips, ext_ip_change):
new_valid_gw_port_attachment = (
- new_network and (not router.gw_port or
+ new_network and (not router.gw_port or ext_ip_change or
router.gw_port['network_id'] != new_network))
if new_valid_gw_port_attachment:
subnets = self._core_plugin._get_subnets_by_network(context,
self._check_for_dup_router_subnet(context, router,
new_network, subnet['id'],
subnet['cidr'])
- self._create_router_gw_port(context, router, new_network)
+ self._create_router_gw_port(context, router, new_network, ext_ips)
def _update_router_gw_info(self, context, router_id, info, router=None):
# TODO(salvatore-orlando): guarantee atomic behavior also across
# class (e.g.: delete_port)
router = router or self._get_router(context, router_id)
gw_port = router.gw_port
- network_id = self._validate_gw_info(context, gw_port, info)
- self._delete_current_gw_port(context, router_id, router, network_id)
- self._create_gw_port(context, router_id, router, network_id)
+ ext_ips = info.get('external_fixed_ips') if info else []
+ ext_ip_change = self._check_for_external_ip_change(
+ context, gw_port, ext_ips)
+ network_id = self._validate_gw_info(context, gw_port, info, ext_ips)
+ self._delete_current_gw_port(context, router_id, router, network_id,
+ ext_ip_change)
+ self._create_gw_port(context, router_id, router, network_id, ext_ips,
+ ext_ip_change)
+
+ def _check_for_external_ip_change(self, context, gw_port, ext_ips):
+ # determine if new external IPs differ from the existing fixed_ips
+ if not ext_ips:
+ # no external_fixed_ips were included
+ return False
+ if not gw_port:
+ return True
+
+ subnet_ids = set(ip['subnet_id'] for ip in gw_port['fixed_ips'])
+ new_subnet_ids = set(f['subnet_id'] for f in ext_ips
+ if f.get('subnet_id'))
+ subnet_change = not new_subnet_ids == subnet_ids
+ if subnet_change:
+ return True
+ ip_addresses = set(ip['ip_address'] for ip in gw_port['fixed_ips'])
+ new_ip_addresses = set(f['ip_address'] for f in ext_ips
+ if f.get('ip_address'))
+ ip_address_change = not ip_addresses == new_ip_addresses
+ return ip_address_change
def _ensure_router_not_in_use(self, context, router_id):
admin_ctx = context.elevated()
agent['id'])
return router_db
- def _delete_current_gw_port(self, context, router_id, router, new_network):
+ def _delete_current_gw_port(self, context, router_id, router, new_network,
+ ext_ip_change):
super(L3_NAT_with_dvr_db_mixin,
self)._delete_current_gw_port(context, router_id,
- router, new_network)
+ router, new_network, ext_ip_change)
if router.extra_attributes.distributed:
self.delete_csnat_router_interface_ports(
context.elevated(), router)
- def _create_gw_port(self, context, router_id, router, new_network):
+ def _create_gw_port(self, context, router_id, router, new_network, ext_ips,
+ ext_ip_change):
super(L3_NAT_with_dvr_db_mixin,
- self)._create_gw_port(context, router_id,
- router, new_network)
+ self)._create_gw_port(context, router_id, router, new_network,
+ ext_ips, ext_ip_change)
# Make sure that the gateway port exists before creating the
# snat interface ports for distributed router.
if router.extra_attributes.distributed and router.gw_port:
'create_snat_intf_ports_if_not_exists')
) as (cw, cs):
self.mixin._create_gw_port(
- self.ctx, router_id, router_db, mock.ANY)
+ self.ctx, router_id, router_db, mock.ANY,
+ mock.ANY, mock.ANY)
self.assertFalse(cs.call_count)
def test_build_routers_list_with_gw_port_mismatch(self):
r['router']['id'],
public_sub['subnet']['network_id'])
+ def test_router_update_gateway_with_different_external_subnet(self):
+ self.skipTest("Plugin doesn't support multiple external networks")
+
+ def test_router_create_with_gwinfo_ext_ip_subnet(self):
+ self.skipTest("Plugin doesn't support multiple external networks")
+
class TestNuageBasicGet(NuagePluginV2TestCase,
test_db_plugin.TestBasicGet):
class TestNuageExtrarouteTestCase(NuagePluginV2TestCase,
extraroute_test.ExtraRouteDBIntTestCase):
+ def test_router_create_with_gwinfo_ext_ip_subnet(self):
+ self.skipTest("Nuage plugin does not support multiple subnets per "
+ "external network.")
+
+ def test_router_update_gateway_with_different_external_subnet(self):
+ self.skipTest("Nuage plugin does not support multiple subnets per "
+ "external networks.")
+
def test_router_update_with_dup_destination_address(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
def _add_external_gateway_to_router(self, router_id, network_id,
expected_code=exc.HTTPOk.code,
- neutron_context=None):
- return self._update('routers', router_id,
- {'router': {'external_gateway_info':
- {'network_id': network_id}}},
+ neutron_context=None, ext_ips=[]):
+ body = {'router':
+ {'external_gateway_info': {'network_id': network_id}}}
+ if ext_ips:
+ body['router']['external_gateway_info'][
+ 'external_fixed_ips'] = ext_ips
+ return self._update('routers', router_id, body,
expected_code=expected_code,
neutron_context=neutron_context)
router['router']['external_gateway_info']['network_id'])
self._delete('routers', router['router']['id'])
+ def test_router_create_with_gwinfo_ext_ip(self):
+ with self.subnet() as s:
+ self._set_net_external(s['subnet']['network_id'])
+ ext_info = {
+ 'network_id': s['subnet']['network_id'],
+ 'external_fixed_ips': [{'ip_address': '10.0.0.99'}]
+ }
+ res = self._create_router(
+ self.fmt, _uuid(), arg_list=('external_gateway_info',),
+ external_gateway_info=ext_info
+ )
+ router = self.deserialize(self.fmt, res)
+ self._delete('routers', router['router']['id'])
+ self.assertEqual(
+ [{'ip_address': '10.0.0.99', 'subnet_id': s['subnet']['id']}],
+ router['router']['external_gateway_info'][
+ 'external_fixed_ips'])
+
+ def test_router_create_with_gwinfo_ext_ip_subnet(self):
+ with self.network() as n:
+ with contextlib.nested(
+ self.subnet(network=n),
+ self.subnet(network=n, cidr='1.0.0.0/24'),
+ self.subnet(network=n, cidr='2.0.0.0/24'),
+ ) as subnets:
+ self._set_net_external(n['network']['id'])
+ for s in subnets:
+ ext_info = {
+ 'network_id': n['network']['id'],
+ 'external_fixed_ips': [
+ {'subnet_id': s['subnet']['id']}]
+ }
+ res = self._create_router(
+ self.fmt, _uuid(), arg_list=('external_gateway_info',),
+ external_gateway_info=ext_info
+ )
+ router = self.deserialize(self.fmt, res)
+ ext_ips = router['router']['external_gateway_info'][
+ 'external_fixed_ips']
+
+ self._delete('routers', router['router']['id'])
+ self.assertEqual(
+ [{'subnet_id': s['subnet']['id'],
+ 'ip_address': mock.ANY}], ext_ips)
+
+ def test_router_create_with_gwinfo_ext_ip_non_admin(self):
+ with self.subnet() as s:
+ self._set_net_external(s['subnet']['network_id'])
+ ext_info = {
+ 'network_id': s['subnet']['network_id'],
+ 'external_fixed_ips': [{'ip_address': '10.0.0.99'}]
+ }
+ res = self._create_router(
+ self.fmt, _uuid(), arg_list=('external_gateway_info',),
+ set_context=True, external_gateway_info=ext_info
+ )
+ self.assertEqual(res.status_int, exc.HTTPForbidden.code)
+
def test_router_list(self):
with contextlib.nested(self.router(),
self.router(),
s2['subnet']['network_id'],
external_gw_info={})
+ def test_router_update_gateway_with_external_ip_used_by_gw(self):
+ with self.router() as r:
+ with self.subnet() as s:
+ self._set_net_external(s['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ s['subnet']['network_id'],
+ ext_ips=[{'ip_address': s['subnet']['gateway_ip']}],
+ expected_code=exc.HTTPBadRequest.code)
+
+ def test_router_update_gateway_with_invalid_external_ip(self):
+ with self.router() as r:
+ with self.subnet() as s:
+ self._set_net_external(s['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ s['subnet']['network_id'],
+ ext_ips=[{'ip_address': '99.99.99.99'}],
+ expected_code=exc.HTTPBadRequest.code)
+
+ def test_router_update_gateway_with_invalid_external_subnet(self):
+ with contextlib.nested(
+ self.subnet(),
+ self.subnet(cidr='1.0.0.0/24'),
+ self.router()
+ ) as (s1, s2, r):
+ self._set_net_external(s1['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ s1['subnet']['network_id'],
+ # this subnet is not on the same network so this should fail
+ ext_ips=[{'subnet_id': s2['subnet']['id']}],
+ expected_code=exc.HTTPBadRequest.code)
+
+ def test_router_update_gateway_with_different_external_subnet(self):
+ with self.network() as n:
+ with contextlib.nested(
+ self.subnet(network=n),
+ self.subnet(network=n, cidr='1.0.0.0/24'),
+ self.router()
+ ) as (s1, s2, r):
+ self._set_net_external(n['network']['id'])
+ res1 = self._add_external_gateway_to_router(
+ r['router']['id'],
+ n['network']['id'],
+ ext_ips=[{'subnet_id': s1['subnet']['id']}])
+ res2 = self._add_external_gateway_to_router(
+ r['router']['id'],
+ n['network']['id'],
+ ext_ips=[{'subnet_id': s2['subnet']['id']}])
+ fip1 = res1['router']['external_gateway_info']['external_fixed_ips'][0]
+ fip2 = res2['router']['external_gateway_info']['external_fixed_ips'][0]
+ self.assertEqual(s1['subnet']['id'], fip1['subnet_id'])
+ self.assertEqual(s2['subnet']['id'], fip2['subnet_id'])
+ self.assertNotEqual(fip1['subnet_id'], fip2['subnet_id'])
+ self.assertNotEqual(fip1['ip_address'], fip2['ip_address'])
+
def test_router_update_gateway_with_existed_floatingip(self):
with self.subnet() as subnet:
self._set_net_external(subnet['subnet']['network_id'])
pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID))
+ #REVISIT: remove the following skips if external IP spec support is added
+ def test_router_create_with_gwinfo_ext_ip(self):
+ raise self.skipException('External IP specification unsupported')
+
+ def test_router_create_with_gwinfo_ext_ip_non_admin(self):
+ raise self.skipException('External IP specification unsupported')
+
+ def test_router_update_gateway_with_different_external_subnet(self):
+ raise self.skipException('External IP specification unsupported')
+
+ def test_router_create_with_gwinfo_ext_ip_subnet(self):
+ raise self.skipException('External IP specification unsupported')
+
class TestL3NatTestCase(L3NatTest,
test_l3_plugin.L3NatDBIntTestCase,