message = _("Quota exceeded for Vcns resource: %(overs)s: %(err_msg)s")
+class RouterInUseByLBService(n_exc.InUse):
+ message = _("Router %(router_id)s is in use by Loadbalancer Service "
+ "%(vip_id)s")
+
+
+class RouterInUseByFWService(n_exc.InUse):
+ message = _("Router %(router_id)s is in use by firewall Service "
+ "%(firewall_id)s")
+
+
class VcnsDriverException(NsxPluginException):
message = _("Error happened in NSX VCNS Driver: %(err_msg)s")
from neutron.extensions import firewall as fw_ext
from neutron.extensions import l3
from neutron.extensions import routedserviceinsertion as rsi
+from neutron.extensions import vpnaas as vpn_ext
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants as service_constants
lrouter['status'] = service_constants.PENDING_CREATE
return lrouter
+ def check_router_in_use(self, context, router_id):
+ router_filter = {'router_id': [router_id]}
+ vpnservices = self.get_vpnservices(
+ context, filters={'router_id': [router_id]})
+ if vpnservices:
+ raise vpn_ext.RouterInUseByVPNService(
+ router_id=router_id,
+ vpnservice_id=vpnservices[0]['id'])
+ vips = self.get_vips(
+ context, filters=router_filter)
+ if vips:
+ raise nsx_exc.RouterInUseByLBService(
+ router_id=router_id,
+ vip_id=vips[0]['id'])
+ firewalls = self.get_firewalls(
+ context, filters=router_filter)
+ if firewalls:
+ raise nsx_exc.RouterInUseByFWService(
+ router_id=router_id,
+ firewall_id=firewalls[0]['id'])
+
def _delete_lrouter(self, context, router_id, nsx_router_id):
binding = vcns_db.get_vcns_router_binding(context.session, router_id)
if not binding:
super(NsxAdvancedPlugin, self)._delete_lrouter(
context, router_id, nsx_router_id)
else:
+ #Check whether router has an advanced service inserted.
+ self.check_router_in_use(context, router_id)
vcns_db.update_vcns_router_binding(
context.session, router_id,
status=service_constants.PENDING_DELETE)
firewall.FirewallNotFound,
self.plugin.get_firewall, ctx, fw_id)
+ def test_delete_router_in_use_by_fwservice(self):
+ router_id = self._create_and_get_router()
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ with self.firewall(name='fw',
+ firewall_policy_id=fwp_id,
+ router_id=router_id,
+ admin_state_up=
+ test_db_firewall.ADMIN_STATE_UP,
+ expected_res_status=201):
+ self._delete('routers', router_id,
+ expected_code=webob.exc.HTTPConflict.code)
+
def test_show_firewall(self):
name = "firewall1"
attrs = self._get_test_firewall_attrs(name)
with self.vip(
router_id=self._create_and_get_router(),
pool=pool, subnet=subnet, no_delete=True) as vip:
- req = self.new_delete_request('vips',
- vip['vip']['id'])
+ req = self.new_delete_request('vips', vip['vip']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
+ def test_delete_router_in_use_by_lbservice(self):
+ router_id = self._create_and_get_router()
+ with contextlib.nested(
+ self.subnet(),
+ self.health_monitor(),
+ self.pool()
+ ) as (subnet, monitor, pool):
+ net_id = subnet['subnet']['network_id']
+ self._set_net_external(net_id)
+ self.plugin.create_pool_health_monitor(
+ context.get_admin_context(),
+ monitor, pool['pool']['id']
+ )
+ with self.vip(
+ router_id=router_id,
+ pool=pool, subnet=subnet):
+ self._delete('routers', router_id,
+ expected_code=web_exc.HTTPConflict.code)
+
def test_show_vip(self):
router_id = self._create_and_get_router()
name = "vip_show"
for k, v in expected:
self.assertEqual(res['vpnservice'][k], v)
+ def test_delete_vpnservice(self):
+ """Test case to delete a vpnservice."""
+ with self.subnet(cidr='10.2.0.0/24') as subnet:
+ with self.router() as router:
+ with self.vpnservice(name='vpnservice',
+ subnet=subnet,
+ router=router,
+ no_delete=True) as vpnservice:
+ req = self.new_delete_request(
+ 'vpnservices', vpnservice['vpnservice']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+
+ def test_delete_router_in_use_by_vpnservice(self):
+ """Test delete router in use by vpn service."""
+ with self.subnet(cidr='10.2.0.0/24') as subnet:
+ with self.router() as router:
+ with self.vpnservice(subnet=subnet,
+ router=router):
+ self._delete('routers', router['router']['id'],
+ expected_code=webob.exc.HTTPConflict.code)
+
def _test_create_ipsec_site_connection(self, key_overrides=None,
ike_key_overrides=None,
ipsec_key_overrides=None,