# to prevent deadlock waiting to acquire a DB lock
# held by another thread in the same process, leading
# to 'lock wait timeout' errors.
+ #
+ # Process L3 first, since, depending on the L3 plugin, it may
+ # involve locking the db-access semaphore, sending RPC
+ # notifications, and/or calling delete_port on this plugin.
+ # Additionally, a rollback may not be enough to undo the
+ # deletion of a floating IP with certain L3 backends.
+ self._process_l3_delete(context, id)
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
- self._process_l3_delete(context, id)
# Get ports to auto-delete.
ports = (session.query(models_v2.Port).
enable_eagerloads(False).
from neutron.common import utils
from neutron import context
from neutron.db import db_base_plugin_v2 as base_plugin
+from neutron.extensions import external_net as external_net
from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
mock.call(_("The port '%s' was deleted"), 'invalid-uuid')
])
+ def test_l3_cleanup_on_net_delete(self):
+ l3plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ kwargs = {'arg_list': (external_net.EXTERNAL,),
+ external_net.EXTERNAL: True}
+ with self.network(**kwargs) as n:
+ with self.subnet(network=n, cidr='200.0.0.0/22'):
+ l3plugin.create_floatingip(
+ context.get_admin_context(),
+ {'floatingip': {'floating_network_id': n['network']['id'],
+ 'tenant_id': n['network']['tenant_id']}}
+ )
+ self._delete('networks', n['network']['id'])
+ flips = l3plugin.get_floatingips(context.get_admin_context())
+ self.assertFalse(flips)
+
def test_delete_port_no_notify_in_disassociate_floatingips(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()