with context.session.begin(subtransactions=True):
router.update({'gw_port_id': None})
context.session.add(router)
- self.delete_port(context, gw_port['id'])
+ self.delete_port(context, gw_port['id'], l3_port_check=False)
if network_id is not None and (gw_port is None or
gw_port['network_id'] != network_id):
'name': ''}})
if not len(gw_port['fixed_ips']):
- self.delete_port(context, gw_port['id'])
+ self.delete_port(context, gw_port['id'], l3_port_check=False)
msg = ('No IPs available for external network %s' %
network_id)
raise q_exc.BadRequest(resource='router', msg=msg)
self._check_for_dup_router_subnet(context, router_id,
port['network_id'],
fixed_ips[0]['subnet_id'])
- port.update({'device_id': router_id,
- 'device_owner': DEVICE_OWNER_ROUTER_INTF})
+ with context.session.begin(subtransactions=True):
+ port.update({'device_id': router_id,
+ 'device_owner': DEVICE_OWNER_ROUTER_INTF})
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
subnet = self._get_subnet(context, subnet_id)
msg = "Either subnet_id or port_id must be specified"
raise q_exc.BadRequest(resource='router', msg=msg)
if 'port_id' in interface_info:
- port_db = self._get_port(context, interface_info['port_id'])
+ port_id = interface_info['port_id']
+ port_db = self._get_port(context, port_id)
+ if not (port_db['device_owner'] == DEVICE_OWNER_ROUTER_INTF and
+ port_db['device_id'] == router_id):
+ raise w_exc.HTTPNotFound("Router %(router_id)s does not have "
+ " an interface with id %(port_id)s"
+ % locals())
if 'subnet_id' in interface_info:
port_subnet_id = port_db['fixed_ips'][0]['subnet_id']
if port_subnet_id != interface_info['subnet_id']:
if port_db['device_id'] != router_id:
raise w_exc.HTTPConflict("port_id %s not used by router" %
port_db['id'])
- self.delete_port(context, port_db['id'])
+ self.delete_port(context, port_db['id'], l3_port_check=False)
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
subnet = self._get_subnet(context, subnet_id)
for p in ports:
if p['fixed_ips'][0]['subnet_id'] == subnet_id:
- self.delete_port(context, p['id'])
+ self.delete_port(context, p['id'], l3_port_check=False)
found = True
break
except exc.NoResultFound:
if not external_port['fixed_ips']:
msg = "Unable to find any IP address on external network"
# remove the external port
- self.delete_port(context, external_port['id'])
+ self.delete_port(context, external_port['id'], l3_port_check=False)
raise q_exc.BadRequest(resource='floatingip', msg=msg)
floating_ip_address = external_port['fixed_ips'][0]['ip_address']
except Exception:
LOG.exception("Floating IP association failed")
# Remove the port created for internal purposes
- self.delete_port(context, external_port['id'])
+ self.delete_port(context, external_port['id'], l3_port_check=False)
raise
return self._make_floatingip_dict(floatingip_db)
floatingip = self._get_floatingip(context, id)
with context.session.begin(subtransactions=True):
context.session.delete(floatingip)
- self.delete_port(context, floatingip['floating_port_id'])
+ self.delete_port(context, floatingip['floating_port_id'],
+ l3_port_check=False)
def get_floatingip(self, context, id, fields=None):
floatingip = self._get_floatingip(context, id)
self._make_floatingip_dict,
filters=filters, fields=fields)
+ def prevent_l3_port_deletion(self, context, port_id):
+ """ Checks to make sure a port is allowed to be deleted, raising
+ an exception if this is not the case. This should be called by
+ any plugin when the API requests the deletion of a port, since
+ some ports for L3 are not intended to be deleted directly via a
+ DELETE to /ports, but rather via other API calls that perform the
+ proper deletion checks.
+ """
+ port_db = self._get_port(context, port_id)
+ if port_db['device_owner'] in [DEVICE_OWNER_ROUTER_INTF,
+ DEVICE_OWNER_ROUTER_GW,
+ DEVICE_OWNER_FLOATINGIP]:
+ raise l3.L3PortInUse(port_id=port_id,
+ device_owner=port_db['device_owner'])
+
def disassociate_floatingips(self, context, port_id):
with context.session.begin(subtransactions=True):
try:
self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
- def port(self, subnet=None, fixed_ips=None, fmt='json', **kwargs):
+ def port(self, subnet=None, fixed_ips=None, fmt='json', no_delete=False,
+ **kwargs):
if not subnet:
with self.subnet() as subnet:
net_id = subnet['subnet']['network_id']
port = self._make_port(fmt, net_id, fixed_ips=fixed_ips,
**kwargs)
yield port
- self._delete('ports', port['port']['id'])
+ if not no_delete:
+ self._delete('ports', port['port']['id'])
else:
net_id = subnet['subnet']['network_id']
port = self._make_port(fmt, net_id, fixed_ips=fixed_ips,
**kwargs)
yield port
- self._delete('ports', port['port']['id'])
+ if not no_delete:
+ self._delete('ports', port['port']['id'])
class TestBasicGet(QuantumDbPluginV2TestCase):
l3_db.L3_NAT_db_mixin):
supported_extension_aliases = ["os-quantum-router"]
- def delete_port(self, context, id):
+ def delete_port(self, context, id, l3_port_check=True):
+ if l3_port_check:
+ self.prevent_l3_port_deletion(context, id)
self.disassociate_floatingips(context, id)
return super(TestL3NatPlugin, self).delete_port(context, id)
def test_router_add_interface_port(self):
with self.router() as r:
- with self.port() as p:
+ with self.port(no_delete=True) as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
body = self._show('ports', p['port']['id'])
self.assertEquals(body['port']['device_id'], r['router']['id'])
+ # clean-up
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+
def test_router_add_interface_dup_subnet1(self):
with self.router() as r:
with self.subnet() as s:
def test_router_add_interface_dup_subnet2(self):
with self.router() as r:
with self.subnet() as s:
- with self.port(subnet=s) as p1:
+ with self.port(subnet=s, no_delete=True) as p1:
with self.port(subnet=s) as p2:
self._router_interface_action('add',
r['router']['id'],
p2['port']['id'],
expected_code=
exc.HTTPBadRequest.code)
+ # clean-up
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p1['port']['id'])
def test_router_add_interface_no_data(self):
with self.router() as r:
def test_router_remove_router_interface_wrong_subnet_returns_409(self):
with self.router() as r:
with self.subnet() as s:
- with self.port() as p:
+ with self.port(no_delete=True) as p:
self._router_interface_action('add',
r['router']['id'],
None,
s['subnet']['id'],
p['port']['id'],
exc.HTTPConflict.code)
+ #remove properly to clean-up
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'])
- def test_router_remove_router_interface_wrong_port_returns_409(self):
+ def test_router_remove_router_interface_wrong_port_returns_404(self):
with self.router() as r:
with self.subnet() as s:
- with self.port() as p:
+ with self.port(no_delete=True) as p:
self._router_interface_action('add',
r['router']['id'],
None,
r['router']['id'],
None,
p2['port']['id'],
- exc.HTTPConflict.code)
+ exc.HTTPNotFound.code)
+ # remove correct interface to cleanup
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'])
# remove extra port created
self._delete('ports', p2['port']['id'])
{'port_id': port_id}},
expected_code=exc.HTTPConflict.code)
+ def test_floating_ip_direct_port_delete_returns_409(self):
+ found = False
+ with self.floatingip_with_assoc() as fip:
+ for p in self._list('ports')['ports']:
+ if p['device_owner'] == 'network:floatingip':
+ self._delete('ports', p['id'],
+ expected_code=exc.HTTPConflict.code)
+ found = True
+ self.assertTrue(found)
+
def test_create_floatingip_no_ext_gateway_return_404(self):
with self.subnet() as public_sub:
with self.port() as private_port: