from oslo_config import cfg
from oslo_log import log as logging
+from oslo_utils import excutils
from neutron.api.v2 import attributes
+from neutron.callbacks import events
+from neutron.callbacks import exceptions
+from neutron.callbacks import registry
+from neutron.callbacks import resources
from neutron.common import constants as l3_const
from neutron.common import exceptions as n_exc
from neutron.common import utils as n_utils
raise NotImplementedError()
elif (not router_db.extra_attributes.distributed and
router_res.get('distributed')):
- # Add a check for Services FWaaS and VPNaaS
- # This check below ensures that the legacy routers with
- # associated VPNaaS or FWaaS services are not allowed to
- # migrate.
- if (self.check_router_has_no_vpnaas(context, router_db) and
- self.check_router_has_no_firewall(context, router_db)):
- LOG.info(_LI("No Service associated, so safe to migrate: %s "
- "listed"), router_db['id'])
-
- def check_router_has_no_firewall(self, context, router_db):
- """Check if FWaaS is associated with the legacy router."""
- fwaas_service = manager.NeutronManager.get_service_plugins().get(
- constants.FIREWALL)
- if fwaas_service:
- tenant_firewalls = fwaas_service.get_firewalls(
- context,
- filters={'tenant_id': [router_db['tenant_id']]})
- if tenant_firewalls:
- raise l3.RouterInUse(router_id=router_db['id'])
- return True
-
- def check_router_has_no_vpnaas(self, context, router_db):
- """Check if VPNaaS is associated with the legacy router."""
- vpn_plugin = manager.NeutronManager.get_service_plugins().get(
- constants.VPN)
- if vpn_plugin:
- vpn_plugin.check_router_in_use(context, router_db['id'])
- return True
+ # Notify advanced services of the imminent state transition
+ # for the router.
+ try:
+ kwargs = {'context': context, 'router': router_db}
+ registry.notify(
+ resources.ROUTER, events.BEFORE_UPDATE, self, **kwargs)
+ except exceptions.CallbackFailure as e:
+ with excutils.save_and_reraise_exception():
+ # NOTE(armax): preserve old check's behavior
+ if len(e.errors) == 1:
+ raise e.errors[0].error
+ raise l3.RouterInUse(router_id=router_db['id'],
+ reason=e)
def _update_distributed_attr(
self, context, router_id, router_db, data, gw_info):
from neutron.common import exceptions
from neutron import context
from neutron.db import l3_dvr_db
-from neutron.extensions import l3
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as plugin_const
self.assertFalse(create_fip.called)
self.assertFalse(delete_fip.called)
- def test__validate_router_migration_prevent_check_advanced_svc(self):
- router = {'name': 'foo_router', 'admin_state_up': True}
- router_db = self._create_router(router)
- # make sure the check are invoked, whether they pass or
- # raise, it does not matter in the context of this test
- with contextlib.nested(
- mock.patch.object(self.mixin, 'check_router_has_no_firewall'),
- mock.patch.object(self.mixin, 'check_router_has_no_vpnaas')
- ) as (check_fw, check_vpn):
- self.mixin._validate_router_migration(
- self.ctx, router_db, {'distributed': True})
- check_fw.assert_called_once_with(self.ctx, router_db)
- check_vpn.assert_called_once_with(self.ctx, router_db)
-
- def test_check_router_has_no_firewall_raises(self):
- with mock.patch.object(
- manager.NeutronManager, 'get_service_plugins') as sp:
- fw_plugin = mock.Mock()
- sp.return_value = {'FIREWALL': fw_plugin}
- fw_plugin.get_firewalls.return_value = [mock.ANY]
- self.assertRaises(
- l3.RouterInUse,
- self.mixin.check_router_has_no_firewall,
- self.ctx, {'id': 'foo_id', 'tenant_id': 'foo_tenant'})
-
- def test_check_router_has_no_firewall_passes(self):
- with mock.patch.object(manager.NeutronManager,
- 'get_service_plugins',
- return_value={}):
- self.assertTrue(
- self.mixin.check_router_has_no_firewall(mock.ANY, mock.ANY))
-
- def test_check_router_has_no_vpn(self):
- with mock.patch.object(
- manager.NeutronManager, 'get_service_plugins') as sp:
- vpn_plugin = mock.Mock()
- sp.return_value = {'VPN': vpn_plugin}
- self.mixin.check_router_has_no_vpnaas(mock.ANY, {'id': 'foo_id'})
- vpn_plugin.check_router_in_use.assert_called_once_with(
- mock.ANY, 'foo_id')
-
def test_remove_router_interface_delete_router_l3agent_binding(self):
interface_info = {'subnet_id': '123'}
router = mock.MagicMock()
self.assertTrue(plugin.get_l3_agents_hosting_routers.called)
self.assertTrue(plugin.check_ports_exist_on_l3agent.called)
self.assertTrue(plugin.remove_router_from_l3_agent.called)
+
+ def test__validate_router_migration_notify_advanced_services(self):
+ router = {'name': 'foo_router', 'admin_state_up': True}
+ router_db = self._create_router(router)
+ with mock.patch.object(l3_dvr_db.registry, 'notify') as mock_notify:
+ self.mixin._validate_router_migration(
+ self.ctx, router_db, {'distributed': True})
+ kwargs = {'context': self.ctx, 'router': router_db}
+ mock_notify.assert_called_once_with(
+ 'router', 'before_update', self.mixin, **kwargs)