from sqlalchemy import orm
from sqlalchemy.orm import exc
+from oslo_utils import excutils
+
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
+from neutron.callbacks import events
+from neutron.callbacks import exceptions
+from neutron.callbacks import registry
+from neutron.callbacks import resources
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
router.gw_port = None
context.session.add(router)
context.session.expire(gw_port)
- vpnservice = manager.NeutronManager.get_service_plugins().get(
- constants.VPN)
- if vpnservice:
- vpnservice.check_router_in_use(context, router_id)
+ self._check_router_gw_port_in_use(context, router_id)
self._core_plugin.delete_port(
admin_ctx, gw_port['id'], l3_port_check=False)
+ def _check_router_gw_port_in_use(self, context, router_id):
+ try:
+ kwargs = {'context': context, 'router_id': router_id}
+ registry.notify(
+ resources.ROUTER_GATEWAY, events.BEFORE_DELETE, self, **kwargs)
+ except exceptions.CallbackFailure as e:
+ with excutils.save_and_reraise_exception():
+ # NOTE(armax): preserve old check's behavior
+ if len(e.errors) == 1:
+ raise e.errors[0].error
+ raise l3.RouterInUse(router_id=router_id, reason=e)
+
def _create_gw_port(self, context, router_id, router, new_network,
ext_ips, ext_ip_change):
new_valid_gw_port_attachment = (
subnet_db = self._core_plugin._get_subnet(context, subnet_id)
subnet_cidr = netaddr.IPNetwork(subnet_db['cidr'])
fip_qry = context.session.query(FloatingIP)
- vpnservice = manager.NeutronManager.get_service_plugins().get(
- constants.VPN)
- if vpnservice:
- vpnservice.check_subnet_in_use(context, subnet_id)
+ try:
+ kwargs = {'context': context, 'subnet_id': subnet_id}
+ registry.notify(
+ resources.ROUTER_INTERFACE,
+ events.BEFORE_DELETE, self, **kwargs)
+ except exceptions.CallbackFailure as e:
+ with excutils.save_and_reraise_exception():
+ # NOTE(armax): preserve old check's behavior
+ if len(e.errors) == 1:
+ raise e.errors[0].error
+ raise l3.RouterInUse(router_id=router_id, reason=e)
for fip_db in fip_qry.filter_by(router_id=router_id):
if netaddr.IPAddress(fip_db['fixed_ip_address']) in subnet_cidr:
raise l3.RouterInterfaceInUseByFloatingIP(
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
+from neutron.callbacks import exceptions
+from neutron.callbacks import registry
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron import context
s['subnet']['id'],
None)
+ def test_router_remove_interface_callback_failure_returns_409(self):
+ with contextlib.nested(
+ self.router(),
+ self.subnet(),
+ mock.patch.object(registry, 'notify')) as (r, s, notify):
+ errors = [
+ exceptions.NotificationError(
+ 'foo_callback_id', n_exc.InUse()),
+ ]
+ # we fail the first time, but not the second, when
+ # the clean-up takes place
+ notify.side_effect = [
+ exceptions.CallbackFailure(errors=errors), None
+ ]
+ self._router_interface_action('add',
+ r['router']['id'],
+ s['subnet']['id'],
+ None)
+ self._router_interface_action(
+ 'remove',
+ r['router']['id'],
+ s['subnet']['id'],
+ None,
+ exc.HTTPConflict.code)
+ # remove properly to clean-up
+ self._router_interface_action(
+ 'remove',
+ r['router']['id'],
+ s['subnet']['id'],
+ None)
+
+ def test_router_clear_gateway_callback_failure_returns_409(self):
+ with contextlib.nested(
+ self.router(),
+ self.subnet(),
+ mock.patch.object(registry, 'notify')) as (r, s, notify):
+ errors = [
+ exceptions.NotificationError(
+ 'foo_callback_id', n_exc.InUse()),
+ ]
+ notify.side_effect = exceptions.CallbackFailure(errors=errors)
+ self._set_net_external(s['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ s['subnet']['network_id'])
+ self._remove_external_gateway_from_router(
+ r['router']['id'],
+ s['subnet']['network_id'],
+ external_gw_info={},
+ expected_code=exc.HTTPConflict.code)
+
def test_router_remove_interface_wrong_subnet_returns_400(self):
with self.router() as r:
with self.subnet() as s: