raise n_exc.NoNetworkAvailable()
-def alloc_network(db_session, network_profile_id):
+def alloc_network(db_session, network_profile_id, tenant_id):
"""
Allocate network using first available free segment ID in segment range.
"""
with db_session.begin(subtransactions=True):
network_profile = get_network_profile(db_session,
- network_profile_id)
+ network_profile_id, tenant_id)
if network_profile.segment_type == c_const.NETWORK_TYPE_VLAN:
return reserve_vlan(db_session, network_profile)
if network_profile.segment_type == c_const.NETWORK_TYPE_OVERLAY:
return net_profile
-def delete_network_profile(db_session, id):
+def delete_network_profile(db_session, id, tenant_id=None):
"""Delete Network Profile."""
LOG.debug("delete_network_profile()")
with db_session.begin(subtransactions=True):
try:
- network_profile = get_network_profile(db_session, id)
+ network_profile = get_network_profile(db_session, id, tenant_id)
db_session.delete(network_profile)
(db_session.query(n1kv_models_v2.ProfileBinding).
filter_by(profile_id=id).delete())
raise c_exc.ProfileTenantBindingNotFound(profile_id=id)
-def update_network_profile(db_session, id, network_profile):
+def update_network_profile(db_session, id, network_profile, tenant_id=None):
"""Update Network Profile."""
LOG.debug("update_network_profile()")
with db_session.begin(subtransactions=True):
- profile = get_network_profile(db_session, id)
+ profile = get_network_profile(db_session, id, tenant_id)
profile.update(network_profile)
return profile
-def get_network_profile(db_session, id):
+def get_network_profile(db_session, id, tenant_id=None):
"""Get Network Profile."""
LOG.debug("get_network_profile()")
+ if tenant_id and c_conf.CISCO_N1K.restrict_network_profiles:
+ if _profile_binding_exists(db_session=db_session,
+ tenant_id=tenant_id,
+ profile_id=id,
+ profile_type=c_const.NETWORK) is None:
+ raise c_exc.ProfileTenantBindingNotFound(profile_id=id)
try:
- return db_session.query(
- n1kv_models_v2.NetworkProfile).filter_by(id=id).one()
+ return db_session.query(n1kv_models_v2.NetworkProfile).filter_by(
+ id=id).one()
except exc.NoResultFound:
raise c_exc.NetworkProfileNotFound(profile=id)
"""
# Check whether the network profile is in use.
if self._segment_in_use(context.session,
- get_network_profile(context.session, id)):
+ get_network_profile(context.session, id,
+ context.tenant_id)):
raise c_exc.NetworkProfileInUse(profile=id)
# Delete and return the network profile if it is not in use.
- _profile = delete_network_profile(context.session, id)
+ _profile = delete_network_profile(context.session, id,
+ context.tenant_id)
return self._make_network_profile_dict(_profile)
def update_network_profile(self, context, id, network_profile):
# Flag to check whether network profile is updated or not.
is_updated = False
p = network_profile["network_profile"]
- original_net_p = get_network_profile(context.session, id)
+ original_net_p = get_network_profile(context.session, id,
+ context.tenant_id)
# Update network profile to tenant id binding.
if context.is_admin and c_const.ADD_TENANTS in p:
profile_bindings = _get_profile_bindings_by_uuid(context.session,
p.get("segment_range") != original_net_p.segment_range):
if not self._segment_in_use(context.session, original_net_p):
delete_segment_allocations(context.session, original_net_p)
- updated_net_p = update_network_profile(context.session, id, p)
+ updated_net_p = update_network_profile(context.session, id, p,
+ context.tenant_id)
self._validate_segment_range_uniqueness(context,
updated_net_p, id)
if original_net_p.segment_type == c_const.NETWORK_TYPE_VLAN:
# Return network profile if it is successfully updated.
if is_updated:
return self._make_network_profile_dict(
- update_network_profile(context.session, id, p))
+ update_network_profile(context.session, id, p,
+ context.tenant_id))
def get_network_profile(self, context, id, fields=None):
"""
profile dictionary. Only these fields will be returned
:returns: network profile dictionary
"""
- profile = get_network_profile(context.session, id)
+ profile = get_network_profile(context.session, id, context.tenant_id)
return self._make_network_profile_dict(profile, fields)
def get_network_profiles(self, context, filters=None, fields=None):
:returns: true if network profile exist else False
"""
try:
- get_network_profile(context.session, id)
+ get_network_profile(context.session, id, context.tenant_id)
return True
except c_exc.NetworkProfileNotFound(profile=id):
return False
PHYS_NET = 'some-phys-net'
VLAN_MIN = 100
VLAN_MAX = 110
+TENANT_NOT_ADMIN = 'not_admin'
+TENANT_TEST = 'test'
class FakeResponse(object):
profile['physical_network'] = PHYS_NET
net_p = n1kv_db_v2.create_network_profile(db_session, profile)
n1kv_db_v2.sync_vlan_allocations(db_session, net_p)
+ n1kv_db_v2.create_profile_binding(db_session, self.tenant_id,
+ net_p['id'], c_const.NETWORK)
+ n1kv_db_v2.create_profile_binding(db_session, TENANT_NOT_ADMIN,
+ net_p['id'], c_const.NETWORK)
+ n1kv_db_v2.create_profile_binding(db_session, TENANT_TEST,
+ net_p['id'], c_const.NETWORK)
return net_p
def setUp(self, ext_mgr=NetworkProfileTestExtensionManager()):
self.new_create_request('network_profiles', net_p_dict)
bindings = (db_session.query(n1kv_models_v2.ProfileBinding).filter_by(
profile_type="network"))
- self.assertEqual(bindings.count(), 0)
+ self.assertEqual(3, bindings.count())
def test_create_network_profile_with_old_add_tenant_fail(self):
data = self._prepare_net_profile_data('vlan')
net_p['network_profile']['id'])
self.assertIsNotNone(tenant4)
+ def test_get_network_profile_restricted(self):
+ c_conf.CONF.set_override('restrict_network_profiles', True,
+ 'CISCO_N1K')
+ ctx1 = context.Context(user_id='admin',
+ tenant_id='tenant1',
+ is_admin=True)
+ sess1 = db.get_session()
+ net_p = self._make_test_profile(name='netp1')
+ n1kv_db_v2.create_profile_binding(sess1, ctx1.tenant_id,
+ net_p['id'], c_const.NETWORK)
+ #network profile binding with creator tenant should always exist
+ profile = n1kv_db_v2.get_network_profile(sess1, net_p['id'],
+ ctx1.tenant_id)
+ self.assertIsNotNone(profile)
+ ctx2 = context.Context(user_id='non_admin',
+ tenant_id='tenant2',
+ is_admin=False)
+ sess2 = db.get_session()
+ self.assertRaises(c_exc.ProfileTenantBindingNotFound,
+ n1kv_db_v2.get_network_profile,
+ sess2, net_p['id'], ctx2.tenant_id)
+
+ def test_get_network_profile_unrestricted(self):
+ c_conf.CONF.set_override('restrict_network_profiles', False,
+ 'CISCO_N1K')
+ ctx1 = context.Context(user_id='admin',
+ tenant_id='tenant1',
+ is_admin=True)
+ sess1 = db.get_session()
+ net_p = self._make_test_profile(name='netp1')
+ n1kv_db_v2.create_profile_binding(sess1, ctx1.tenant_id,
+ net_p['id'], c_const.NETWORK)
+ # network profile binding with creator tenant should always exist
+ profile = n1kv_db_v2.get_network_profile(sess1, net_p['id'],
+ ctx1.tenant_id)
+ self.assertIsNotNone(profile)
+ ctx2 = context.Context(user_id='non_admin',
+ tenant_id='tenant2',
+ is_admin=False)
+ sess2 = db.get_session()
+ profile = n1kv_db_v2.get_network_profile(sess2, net_p['id'],
+ ctx2.tenant_id)
+ #network profile will be returned even though the profile is
+ #not bound to tenant of sess2
+ self.assertIsNotNone(profile)
+
class TestN1kvBasicGet(test_plugin.TestBasicGet,
N1kvPluginTestCase):