from neutron.extensions import l3
from neutron.extensions import portbindings
from neutron.i18n import _LI
+from neutron import manager
from neutron.openstack.common import log as logging
+from neutron.plugins.common import constants
LOG = logging.getLogger(__name__)
self._process_extra_attr_router_create(context, router_db, router)
return router_db
- def _validate_router_migration(self, router_db, router_res):
+ def _validate_router_migration(self, context, router_db, router_res):
"""Allow centralized -> distributed state transition only."""
if (router_db.extra_attributes.distributed and
router_res.get('distributed') is False):
LOG.info(_LI("Centralizing distributed router %s "
"is not supported"), router_db['id'])
raise NotImplementedError()
- # TODO(Swami): Add a check for Services FWaaS and VPNaaS
+ elif (not router_db.extra_attributes.distributed and
+ router_res.get('distributed')):
+ # Add a check for Services FWaaS and VPNaaS
+ # This check below ensures that the legacy routers with
+ # associated VPNaaS or FWaaS services are not allowed to
+ # migrate.
+ if (self.check_router_has_no_vpnaas(context, router_db) and
+ self.check_router_has_no_firewall(context, router_db)):
+ LOG.info(_LI("No Service associated, so safe to migrate: %s "
+ "listed"), router_db['id'])
+
+ def check_router_has_no_firewall(self, context, router_db):
+ """Check if FWaaS is associated with the legacy router."""
+ fwaas_service = manager.NeutronManager.get_service_plugins().get(
+ constants.FIREWALL)
+ if fwaas_service:
+ tenant_firewalls = fwaas_service.get_firewalls(
+ context,
+ filters={'tenant_id': [router_db['tenant_id']]})
+ if tenant_firewalls:
+ raise l3.RouterInUse(router_id=router_db['id'])
+ return True
+
+ def check_router_has_no_vpnaas(self, context, router_db):
+ """Check if VPNaaS is associated with the legacy router."""
+ vpn_plugin = manager.NeutronManager.get_service_plugins().get(
+ constants.VPN)
+ if vpn_plugin:
+ vpn_plugin.check_router_in_use(context, router_db['id'])
+ return True
def _update_distributed_attr(
self, context, router_id, router_db, data, gw_info):
migrating_to_distributed = (
not router_db.extra_attributes.distributed and
data.get('distributed') is True)
- self._validate_router_migration(router_db, data)
+ self._validate_router_migration(context, router_db, data)
router_db.extra_attributes.update(data)
self._update_distributed_attr(
context, router_id, router_db, data, gw_info)
from neutron.common import constants as l3_const
from neutron import context
from neutron.db import l3_dvr_db
+from neutron.extensions import l3
+from neutron.extensions import vpnaas
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.tests.unit import testlib_api
}
router_db = self._create_router(router)
self.assertIsNone(self.mixin._validate_router_migration(
- router_db, {'name': 'foo_router_2'}))
+ self.ctx, router_db, {'name': 'foo_router_2'}))
def test__validate_router_migration_raise_error(self):
router = {
router_db = self._create_router(router)
self.assertRaises(NotImplementedError,
self.mixin._validate_router_migration,
- router_db, {'distributed': False})
+ self.ctx, router_db, {'distributed': False})
def test_update_router_db_centralized_to_distributed(self):
router = {'name': 'foo_router', 'admin_state_up': True}
self.mixin._update_fip_assoc(
self.ctx, fip, floatingip, mock.ANY)
self.assertTrue(vf.called)
+
+ def _router_migration_with_services_setup(
+ self, test_side_effect_vpn=None, test_side_effect_fw=None,
+ no_vpn=None, no_fw=None):
+ '''Helper function to test router migration with services.'''
+ router = {'name': 'foo_router',
+ 'admin_state_up': True}
+ router_db = self._create_router(router)
+ self.assertFalse(router_db.extra_attributes.distributed)
+ with contextlib.nested(
+ mock.patch.object(
+ self.mixin, 'check_router_has_no_vpnaas',
+ side_effect=test_side_effect_vpn, return_value=no_vpn),
+ mock.patch.object(
+ self.mixin, 'check_router_has_no_firewall',
+ side_effect=test_side_effect_fw, return_value=no_fw),
+ mock.patch.object(self.mixin, '_get_router',
+ return_value=router_db)
+ ) as (vpn_mock, fw_mock, r_mock):
+ router_db['status'] = 'ACTIVE'
+ if no_vpn and no_fw and (
+ test_side_effect_vpn and test_side_effect_fw) is None:
+ self.mixin._validate_router_migration(
+ self.ctx, router_db, {'distributed': True})
+ return router_db, fw_mock, vpn_mock
+ if not no_vpn and test_side_effect_vpn:
+ self.assertRaises(
+ vpnaas.RouterInUseByVPNService,
+ self.mixin._validate_router_migration,
+ self.ctx,
+ router_db,
+ {'distributed': True})
+ return router_db, fw_mock, vpn_mock
+ if not no_fw and test_side_effect_fw:
+ self.assertRaises(
+ l3.RouterInUse,
+ self.mixin._validate_router_migration,
+ self.ctx,
+ router_db,
+ {'distributed': True})
+ return router_db, fw_mock, vpn_mock
+
+ def test__validate_router_migration_fail_with_vpnservice(self):
+ '''Test to check router migration fail with vpn.'''
+ router_db, mock_firewall, mock_vpnaas = (
+ self._router_migration_with_services_setup(
+ test_side_effect_vpn=(
+ vpnaas.RouterInUseByVPNService(
+ router_id='fake_id',
+ vpnservice_id='fake_vpnaas_id')
+ ),
+ no_vpn=False,
+ no_fw=True))
+ mock_vpnaas.assert_called_once_with(self.ctx, router_db)
+
+ def test__validate_router_migration_fail_with_fwservice(self):
+ '''Test to check router migration with firewall.'''
+ router_db, mock_firewall, mock_vpnaas = (
+ self._router_migration_with_services_setup(
+ test_side_effect_vpn=None,
+ test_side_effect_fw=l3.RouterInUse(
+ router_id='fake_id'
+ ),
+ no_vpn=True,
+ no_fw=False))
+ mock_firewall.assert_called_once_with(self.ctx, router_db)
+
+ def test__validate_router_migration_with_no_services(self):
+ '''Test to check router migration with no services.'''
+ router_db, mock_firewall, mock_vpnaas = (
+ self._router_migration_with_services_setup(
+ no_vpn=True,
+ no_fw=True))
+ mock_vpnaas.assert_called_once_with(self.ctx, router_db)
+ mock_firewall.assert_called_once_with(self.ctx, router_db)