from sqlalchemy import sql
from neutron.api.v2 import attributes
+from neutron.common import constants
from neutron.common import exceptions as n_exc
import neutron.db.api as db
from neutron.db import models_v2
db_session.add(binding)
+def delete_segment_allocations(db_session, net_p):
+ """
+ Delete the segment allocation entry from the table.
+
+ :params db_session: database session
+ :params net_p: network profile object
+ """
+ with db_session.begin(subtransactions=True):
+ seg_min, seg_max = get_segment_range(net_p)
+ if net_p['segment_type'] == c_const.NETWORK_TYPE_VLAN:
+ db_session.query(n1kv_models_v2.N1kvVlanAllocation).filter(
+ (n1kv_models_v2.N1kvVlanAllocation.physical_network ==
+ net_p['physical_network']),
+ (n1kv_models_v2.N1kvVlanAllocation.vlan_id >= seg_min),
+ (n1kv_models_v2.N1kvVlanAllocation.vlan_id <=
+ seg_max)).delete()
+ elif net_p['segment_type'] == c_const.NETWORK_TYPE_OVERLAY:
+ db_session.query(n1kv_models_v2.N1kvVxlanAllocation).filter(
+ (n1kv_models_v2.N1kvVxlanAllocation.vxlan_id >= seg_min),
+ (n1kv_models_v2.N1kvVxlanAllocation.vxlan_id <=
+ seg_max)).delete()
+
+
def sync_vlan_allocations(db_session, net_p):
"""
Synchronize vlan_allocations table with configured VLAN ranges.
"physical_network": network_profile["physical_network"]}
return self._fields(res, fields)
+ def _segment_in_use(self, db_session, network_profile):
+ """Verify whether a segment is allocated for given network profile."""
+ with db_session.begin(subtransactions=True):
+ return (db_session.query(n1kv_models_v2.N1kvNetworkBinding).
+ filter_by(profile_id=network_profile['id'])).first()
+
def get_network_profile_bindings(self, context, filters=None, fields=None):
"""
Retrieve a list of profile bindings for network profiles.
:param id: UUID representing network profile to delete
:returns: deleted network profile dictionary
"""
+ # Check whether the network profile is in use.
+ if self._segment_in_use(context.session,
+ get_network_profile(context.session, id)):
+ raise c_exc.NetworkProfileInUse(profile=id)
+ # Delete and return the network profile if it is not in use.
_profile = delete_network_profile(context.session, id)
return self._make_network_profile_dict(_profile)
:param network_profile: network profile dictionary
:returns: updated network profile dictionary
"""
+ # Flag to check whether network profile is updated or not.
+ is_updated = False
p = network_profile["network_profile"]
+ original_net_p = get_network_profile(context.session, id)
+ # Update network profile to tenant id binding.
if context.is_admin and "add_tenant" in p:
- self.add_network_profile_tenant(context.session,
- id,
+ self.add_network_profile_tenant(context.session, id,
p["add_tenant"])
- return self._make_network_profile_dict(get_network_profile(
- context.session, id))
+ is_updated = True
if context.is_admin and "remove_tenant" in p:
delete_profile_binding(context.session, p["remove_tenant"], id)
- return self._make_network_profile_dict(get_network_profile(
- context.session, id))
- return self._make_network_profile_dict(
- update_network_profile(context.session, id, p))
+ is_updated = True
+ if original_net_p.segment_type == c_const.NETWORK_TYPE_TRUNK:
+ #TODO(abhraut): Remove check when Trunk supports segment range.
+ if p.get('segment_range'):
+ msg = _("segment_range not required for TRUNK")
+ LOG.error(msg)
+ raise n_exc.InvalidInput(error_message=msg)
+ if original_net_p.segment_type in [c_const.NETWORK_TYPE_VLAN,
+ c_const.NETWORK_TYPE_TRUNK]:
+ if p.get("multicast_ip_range"):
+ msg = _("multicast_ip_range not required")
+ LOG.error(msg)
+ raise n_exc.InvalidInput(error_message=msg)
+ # Update segment range if network profile is not in use.
+ if (p.get("segment_range") and
+ p.get("segment_range") != original_net_p.segment_range):
+ if not self._segment_in_use(context.session, original_net_p):
+ delete_segment_allocations(context.session, original_net_p)
+ updated_net_p = update_network_profile(context.session, id, p)
+ self._validate_segment_range_uniqueness(context,
+ updated_net_p, id)
+ if original_net_p.segment_type == c_const.NETWORK_TYPE_VLAN:
+ sync_vlan_allocations(context.session, updated_net_p)
+ if original_net_p.segment_type == c_const.NETWORK_TYPE_OVERLAY:
+ sync_vxlan_allocations(context.session, updated_net_p)
+ is_updated = True
+ else:
+ raise c_exc.NetworkProfileInUse(profile=id)
+ if (p.get('multicast_ip_range') and
+ (p.get("multicast_ip_range") !=
+ original_net_p.get("multicast_ip_range"))):
+ self._validate_multicast_ip_range(p)
+ if not self._segment_in_use(context.session, original_net_p):
+ is_updated = True
+ else:
+ raise c_exc.NetworkProfileInUse(profile=id)
+ # Update network profile if name is updated and the network profile
+ # is not yet updated.
+ if "name" in p and not is_updated:
+ is_updated = True
+ # Return network profile if it is successfully updated.
+ if is_updated:
+ return self._make_network_profile_dict(
+ update_network_profile(context.session, id, p))
def get_network_profile(self, context, id, fields=None):
"""
return False
def _get_segment_range(self, data):
- # Sort the range to ensure min, max is in order
- return sorted(int(seg) for seg in data.split("-")[:2])
+ return (int(seg) for seg in data.split("-")[:2])
def _validate_network_profile_args(self, context, p):
"""
:param p: network profile object
"""
self._validate_network_profile(p)
- self._validate_segment_range_uniqueness(context, p)
+ segment_type = p['segment_type'].lower()
+ if segment_type != c_const.NETWORK_TYPE_TRUNK:
+ self._validate_segment_range_uniqueness(context, p)
def _validate_segment_range(self, network_profile):
"""
msg = _("%s is not a valid multicast ip address") % ip
LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
+ if netaddr.IPAddress(ip) <= netaddr.IPAddress('224.0.0.255'):
+ msg = _("%s is reserved multicast ip address") % ip
+ LOG.error(msg)
+ raise n_exc.InvalidInput(error_message=msg)
except netaddr.AddrFormatError:
msg = _("%s is not a valid ip address") % ip
LOG.error(msg)
if any(net_p[arg] == "" for arg in ["segment_type"]):
msg = _("Arguments segment_type missing"
" for network profile")
- LOG.exception(msg)
+ LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
segment_type = net_p["segment_type"].lower()
if segment_type not in [c_const.NETWORK_TYPE_VLAN,
c_const.NETWORK_TYPE_MULTI_SEGMENT]:
msg = _("segment_type should either be vlan, overlay, "
"multi-segment or trunk")
- LOG.exception(msg)
+ LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
if segment_type == c_const.NETWORK_TYPE_VLAN:
if "physical_network" not in net_p:
msg = _("Argument physical_network missing "
"for network profile")
- LOG.exception(msg)
+ LOG.error(msg)
+ raise n_exc.InvalidInput(error_message=msg)
+ if segment_type == c_const.NETWORK_TYPE_TRUNK:
+ if net_p["segment_range"]:
+ msg = _("segment_range not required for trunk")
+ LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
if segment_type in [c_const.NETWORK_TYPE_TRUNK,
c_const.NETWORK_TYPE_OVERLAY]:
if not attributes.is_attr_set(net_p.get("sub_type")):
msg = _("Argument sub_type missing "
"for network profile")
- LOG.exception(msg)
+ LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
if segment_type in [c_const.NETWORK_TYPE_VLAN,
c_const.NETWORK_TYPE_OVERLAY]:
if "segment_range" not in net_p:
msg = _("Argument segment_range missing "
"for network profile")
- LOG.exception(msg)
+ LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
self._validate_segment_range(net_p)
if segment_type == c_const.NETWORK_TYPE_OVERLAY:
else:
net_p['multicast_ip_range'] = '0.0.0.0'
- def _validate_segment_range_uniqueness(self, context, net_p):
+ def _validate_segment_range_uniqueness(self, context, net_p, id=None):
"""
Validate that segment range doesn't overlap.
:param context: neutron api request context
:param net_p: network profile dictionary
+ :param id: UUID representing the network profile being updated
"""
segment_type = net_p["segment_type"].lower()
+ seg_min, seg_max = self._get_segment_range(net_p['segment_range'])
if segment_type == c_const.NETWORK_TYPE_VLAN:
+ if not ((seg_min <= seg_max) and
+ ((seg_min in range(constants.MIN_VLAN_TAG,
+ c_const.NEXUS_VLAN_RESERVED_MIN) and
+ seg_max in range(constants.MIN_VLAN_TAG,
+ c_const.NEXUS_VLAN_RESERVED_MIN)) or
+ (seg_min in range(c_const.NEXUS_VLAN_RESERVED_MAX + 1,
+ constants.MAX_VLAN_TAG) and
+ seg_max in range(c_const.NEXUS_VLAN_RESERVED_MAX + 1,
+ constants.MAX_VLAN_TAG)))):
+ msg = (_("Segment range is invalid, select from "
+ "%(min)s-%(nmin)s, %(nmax)s-%(max)s") %
+ {"min": constants.MIN_VLAN_TAG,
+ "nmin": c_const.NEXUS_VLAN_RESERVED_MIN - 1,
+ "nmax": c_const.NEXUS_VLAN_RESERVED_MAX + 1,
+ "max": constants.MAX_VLAN_TAG - 1})
+ LOG.error(msg)
+ raise n_exc.InvalidInput(error_message=msg)
profiles = _get_network_profiles(
db_session=context.session,
physical_network=net_p["physical_network"]
)
- else:
+ elif segment_type in [c_const.NETWORK_TYPE_OVERLAY,
+ c_const.NETWORK_TYPE_MULTI_SEGMENT,
+ c_const.NETWORK_TYPE_TRUNK]:
+ if (seg_min > seg_max or
+ seg_min < c_const.NEXUS_VXLAN_MIN or
+ seg_max > c_const.NEXUS_VXLAN_MAX):
+ msg = (_("segment range is invalid. Valid range is : "
+ "%(min)s-%(max)s") %
+ {"min": c_const.NEXUS_VXLAN_MIN,
+ "max": c_const.NEXUS_VXLAN_MAX})
+ LOG.error(msg)
+ raise n_exc.InvalidInput(error_message=msg)
profiles = _get_network_profiles(db_session=context.session)
if profiles:
for profile in profiles:
+ if id and profile.id == id:
+ continue
name = profile.name
segment_range = profile.segment_range
if net_p["name"] == name:
msg = (_("NetworkProfile name %s already exists"),
net_p["name"])
- LOG.exception(msg)
+ LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
if (c_const.NETWORK_TYPE_MULTI_SEGMENT in
[profile.segment_type, net_p["segment_type"]] or
((seg_min <= profile_seg_min) and
(seg_max >= profile_seg_max))):
msg = _("Segment range overlaps with another profile")
- LOG.exception(msg)
+ LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
def _get_network_profile_by_name(self, db_session, name):
update_res = update_req.get_response(self.ext_api)
self.assertEqual(update_res.status_int, 400)
+ def test_update_network_profiles_with_networks_fail(self):
+ net_p = self._make_test_profile(name='netp1')
+ data = {'network_profile': {'segment_range': '200-210'}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['id'])
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(update_res.status_int, 200)
+ net_data = {'network': {'name': 'net1',
+ n1kv.PROFILE_ID: net_p['id'],
+ 'tenant_id': 'some_tenant'}}
+ network_req = self.new_create_request('networks', net_data)
+ network_res = network_req.get_response(self.api)
+ self.assertEqual(network_res.status_int, 201)
+ data = {'network_profile': {'segment_range': '300-310'}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['id'])
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(update_res.status_int, 409)
+
def test_create_overlay_network_profile_invalid_multicast_fail(self):
net_p_dict = self._prepare_net_profile_data('overlay')
data = {'network_profile': {'sub_type': 'native_vxlan',
res = net_p_req.get_response(self.ext_api)
self.assertEqual(res.status_int, 201)
+ def test_update_overlay_network_profile_correct_multicast_pass(self):
+ data = self._prepare_net_profile_data('overlay')
+ net_p_req = self.new_create_request('network_profiles', data)
+ res = net_p_req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+ net_p = self.deserialize(self.fmt, res)
+ data = {'network_profile': {'multicast_ip_range':
+ '224.0.1.0-224.0.1.100'}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['network_profile']['id'])
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(update_res.status_int, 200)
+
+ def test_create_overlay_network_profile_reservedip_multicast_fail(self):
+ net_p_dict = self._prepare_net_profile_data('overlay')
+ data = {'network_profile': {'multicast_ip_range':
+ '224.0.0.100-224.0.1.100'}}
+ net_p_req = self.new_create_request('network_profiles', data,
+ net_p_dict)
+ res = net_p_req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 400)
+
+ def test_update_overlay_network_profile_reservedip_multicast_fail(self):
+ data = self._prepare_net_profile_data('overlay')
+ net_p_req = self.new_create_request('network_profiles', data)
+ res = net_p_req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+ net_p = self.deserialize(self.fmt, res)
+ data = {'network_profile': {'multicast_ip_range':
+ '224.0.0.11-224.0.0.111'}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['network_profile']['id'])
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(update_res.status_int, 400)
+
+ def test_update_vlan_network_profile_multicast_fail(self):
+ net_p = self._make_test_profile(name='netp1')
+ data = {'network_profile': {'multicast_ip_range':
+ '224.0.1.0-224.0.1.100'}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['id'])
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(update_res.status_int, 400)
+
+ def test_update_trunk_network_profile_segment_range_fail(self):
+ data = self._prepare_net_profile_data('trunk')
+ net_p_req = self.new_create_request('network_profiles', data)
+ res = net_p_req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+ net_p = self.deserialize(self.fmt, res)
+ data = {'network_profile': {'segment_range':
+ '100-200'}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['network_profile']['id'])
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(update_res.status_int, 400)
+
+ def test_update_trunk_network_profile_multicast_fail(self):
+ data = self._prepare_net_profile_data('trunk')
+ net_p_req = self.new_create_request('network_profiles', data)
+ res = net_p_req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+ net_p = self.deserialize(self.fmt, res)
+ data = {'network_profile': {'multicast_ip_range':
+ '224.0.1.0-224.0.1.100'}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['network_profile']['id'])
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(update_res.status_int, 400)
+
def test_create_network_profile_populate_vlan_segment_pool(self):
db_session = db.get_session()
net_p_dict = self._prepare_net_profile_data('vlan')
PHYS_NET,
VLAN_MAX + 1)
+ def test_delete_network_profile_with_network_fail(self):
+ net_p = self._make_test_profile(name='netp1')
+ net_data = {'network': {'name': 'net1',
+ n1kv.PROFILE_ID: net_p['id'],
+ 'tenant_id': 'some_tenant'}}
+ network_req = self.new_create_request('networks', net_data)
+ network_res = network_req.get_response(self.api)
+ self.assertEqual(network_res.status_int, 201)
+ self._delete('network_profiles', net_p['id'],
+ expected_code=409)
+
def test_delete_network_profile_deallocate_vlan_segment_pool(self):
db_session = db.get_session()
net_p_dict = self._prepare_net_profile_data('vlan')