When using LBaaS and trying to delete a subnet, neutron has no way of
knowing if the subnet is associated to some pool. As a result, the
subnet is deleted but the pool remains associated to the (now
nonexistent) subnet_id. This patch lays the ground-work for adding a
check in LBaaS' side to prevent such cases.
Related-Bug: #
1413817
Change-Id: I3d5e231b67c72ffd919c92d65b57da56c63e053c
ROUTER_INTERFACE = 'router_interface'
SECURITY_GROUP = 'security_group'
SECURITY_GROUP_RULE = 'security_group_rule'
+SUBNET = 'subnet'
VALID = (
PORT,
ROUTER_INTERFACE,
SECURITY_GROUP,
SECURITY_GROUP_RULE,
+ SUBNET,
)
class SubnetInUse(InUse):
message = _("Unable to complete operation on subnet %(subnet_id)s. "
- "One or more ports have an IP allocation from this subnet.")
+ "%(reason)s")
+
+ def __init__(self, **kwargs):
+ if 'reason' not in kwargs:
+ kwargs['reason'] = _("One or more ports have an IP allocation "
+ "from this subnet.")
+ super(SubnetInUse, self).__init__(**kwargs)
class PortInUse(InUse):
from sqlalchemy.orm import exc
from neutron.api.v2 import attributes
+from neutron.callbacks import events
+from neutron.callbacks import exceptions
+from neutron.callbacks import registry
+from neutron.callbacks import resources
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
AUTO_DELETE_PORT_OWNERS = [constants.DEVICE_OWNER_DHCP]
+def _check_subnet_not_used(context, subnet_id):
+ try:
+ kwargs = {'context': context, 'subnet_id': subnet_id}
+ registry.notify(
+ resources.SUBNET, events.BEFORE_DELETE, None, **kwargs)
+ except exceptions.CallbackFailure as e:
+ raise n_exc.SubnetInUse(subnet_id=subnet_id, reason=e)
+
+
class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
common_db_mixin.CommonDbMixin):
"""V2 Neutron plugin interface implementation using SQLAlchemy models.
def delete_subnet(self, context, id):
with context.session.begin(subtransactions=True):
subnet = self._get_subnet(context, id)
+
+ # Make sure the subnet isn't used by other resources
+ _check_subnet_not_used(context, id)
+
# Delete all network owned ports
qry_network_ports = (
context.session.query(models_v2.IPAllocation).
raise os_db_exception.RetryRequest(
exc.SubnetInUse(subnet_id=id))
+ db_base_plugin_v2._check_subnet_not_used(context, id)
+
# If allocated is None, then all the IPAllocation were
# correctly deleted during the previous pass.
if not allocated:
from neutron.api.v2 import attributes as attr
from neutron.common import exceptions as exc
+from neutron.db import db_base_plugin_v2
from neutron.db import portbindings_base
from neutron.extensions import external_net
from neutron.extensions import portbindings
belonging to the specified tenant.
"""
+ db_base_plugin_v2._check_subnet_not_used(context, subnet_id)
self._delete_resource('subnet', context, subnet_id)
def get_subnets(self, context, filters=None, fields=None):
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.api.v2 import router
+from neutron.callbacks import exceptions
+from neutron.callbacks import registry
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
+ def test_delete_subnet_with_callback(self):
+ with contextlib.nested(
+ self.subnet(),
+ mock.patch.object(registry, 'notify')) as (subnet, notify):
+
+ errors = [
+ exceptions.NotificationError(
+ 'fake_id', n_exc.NeutronException()),
+ ]
+ notify.side_effect = [
+ exceptions.CallbackFailure(errors=errors), None
+ ]
+
+ # Make sure the delete request fails
+ delete_request = self.new_delete_request('subnets',
+ subnet['subnet']['id'])
+ delete_response = delete_request.get_response(self.api)
+
+ self.assertTrue('NeutronError' in delete_response.json)
+ self.assertEqual('SubnetInUse',
+ delete_response.json['NeutronError']['type'])
+
+ # Make sure the subnet wasn't deleted
+ list_request = self.new_list_request(
+ 'subnets', params="id=%s" % subnet['subnet']['id'])
+ list_response = list_request.get_response(self.api)
+ self.assertEqual(subnet['subnet']['id'],
+ list_response.json['subnets'][0]['id'])
+
def _helper_test_validate_subnet(self, option, exception):
cfg.CONF.set_override(option, 0)
with self.network() as network: