CFG_AGENT = 'cisco_cfg_agent'
# Topic for routing service helper in Cisco configuration agent
CFG_AGENT_L3_ROUTING = 'cisco_cfg_agent_l3_routing'
+
+# Values for network profile fields
+ADD_TENANTS = 'add_tenants'
+REMOVE_TENANTS = 'remove_tenants'
def _profile_binding_exists(db_session, tenant_id, profile_id, profile_type):
+ """Check if the profile-tenant binding exists."""
LOG.debug(_("_profile_binding_exists()"))
+ db_session = db_session or db.get_session()
return (db_session.query(n1kv_models_v2.ProfileBinding).
filter_by(tenant_id=tenant_id, profile_id=profile_id,
profile_type=profile_type).first())
return
+def update_profile_binding(db_session, profile_id, tenants, profile_type):
+ """Updating Profile Binding."""
+ LOG.debug('update_profile_binding()')
+ if profile_type not in ("network", "policy"):
+ raise n_exc.NeutronException(_("Invalid profile type"))
+ db_session = db_session or db.get_session()
+ with db_session.begin(subtransactions=True):
+ db_session.query(n1kv_models_v2.ProfileBinding).filter_by(
+ profile_id=profile_id, profile_type=profile_type).delete()
+ new_tenants_set = set(tenants)
+ for tenant_id in new_tenants_set:
+ tenant = n1kv_models_v2.ProfileBinding(profile_type = profile_type,
+ tenant_id = tenant_id,
+ profile_id = profile_id)
+ db_session.add(tenant)
+
+
def _get_profile_bindings(db_session, profile_type=None):
"""
Retrieve a list of profile bindings.
context.tenant_id,
net_profile.id,
c_const.NETWORK)
- if p.get("add_tenant"):
- self.add_network_profile_tenant(context.session,
- net_profile.id,
- p["add_tenant"])
+ if p.get(c_const.ADD_TENANTS):
+ for tenant in p[c_const.ADD_TENANTS]:
+ self.add_network_profile_tenant(context.session,
+ net_profile.id,
+ tenant)
return self._make_network_profile_dict(net_profile)
def delete_network_profile(self, context, id):
p = network_profile["network_profile"]
original_net_p = get_network_profile(context.session, id)
# Update network profile to tenant id binding.
- if context.is_admin and "add_tenant" in p:
- self.add_network_profile_tenant(context.session, id,
- p["add_tenant"])
+ if context.is_admin and c_const.ADD_TENANTS in p:
+ if context.tenant_id not in p[c_const.ADD_TENANTS]:
+ p[c_const.ADD_TENANTS].append(context.tenant_id)
+ update_profile_binding(context.session, id,
+ p[c_const.ADD_TENANTS], c_const.NETWORK)
is_updated = True
- if context.is_admin and "remove_tenant" in p:
- delete_profile_binding(context.session, p["remove_tenant"], id)
+ if context.is_admin and c_const.REMOVE_TENANTS in p:
+ for remove_tenant in p[c_const.REMOVE_TENANTS]:
+ if remove_tenant == context.tenant_id:
+ continue
+ delete_profile_binding(context.session, remove_tenant, id)
is_updated = True
if original_net_p.segment_type == c_const.NETWORK_TYPE_TRUNK:
#TODO(abhraut): Remove check when Trunk supports segment range.
'is_visible': True, 'default': ''},
'tenant_id': {'allow_post': True, 'allow_put': False,
'is_visible': False, 'default': ''},
- 'add_tenant': {'allow_post': True, 'allow_put': True,
- 'is_visible': True, 'default': None},
- 'remove_tenant': {'allow_post': True, 'allow_put': True,
- 'is_visible': True, 'default': None},
+ 'add_tenants': {'allow_post': True, 'allow_put': True,
+ 'is_visible': True, 'default': None,
+ 'convert_to': attributes.convert_none_to_empty_list},
+ 'remove_tenants': {
+ 'allow_post': True, 'allow_put': True,
+ 'is_visible': True, 'default': None,
+ 'convert_to': attributes.convert_none_to_empty_list,
+ },
},
'network_profile_bindings': {
'profile_id': {'allow_post': False, 'allow_put': False,
except s_exc.NoResultFound:
self.fail("Could not create Profile Binding")
+ def test_update_profile_binding(self):
+ test_tenant_id = "d434dd90-76ec-11e2-bcfd-0800200c9a66"
+ test_profile_id = "dd7b9741-76ec-11e2-bcfd-0800200c9a66"
+ test_profile_type = "network"
+ n1kv_db_v2.create_profile_binding(self.session,
+ test_tenant_id,
+ test_profile_id,
+ test_profile_type)
+ new_tenants = ['d434dd90-76ec-11e2-bcfd-0800200c9a67',
+ 'd434dd90-76ec-11e2-bcfd-0800200c9a68',
+ 'd434dd90-76ec-11e2-bcfd-0800200c9a69']
+ n1kv_db_v2.update_profile_binding(self.session,
+ test_profile_id,
+ new_tenants,
+ test_profile_type)
+
+ result = self.session.query(n1kv_models_v2.ProfileBinding).filter_by(
+ profile_type=test_profile_type,
+ profile_id=test_profile_id).all()
+ self.assertEqual(3, len(result))
+
def test_get_profile_binding(self):
test_tenant_id = "d434dd90-76ec-11e2-bcfd-0800200c9a66"
test_profile_id = "dd7b9741-76ec-11e2-bcfd-0800200c9a66"
profile_type="network"))
self.assertEqual(bindings.count(), 0)
+ def test_create_network_profile_with_old_add_tenant_fail(self):
+ data = self._prepare_net_profile_data('vlan')
+ data['network_profile']['add_tenant'] = 'tenant1'
+ net_p_req = self.new_create_request('network_profiles', data)
+ res = net_p_req.get_response(self.ext_api)
+ self.assertEqual(400, res.status_int)
+
+ def test_create_network_profile_multi_tenants(self):
+ data = self._prepare_net_profile_data('vlan')
+ data['network_profile'][c_const.ADD_TENANTS] = ['tenant1', 'tenant2']
+ del data['network_profile']['tenant_id']
+ net_p_req = self.new_create_request('network_profiles', data)
+ net_p_req.environ['neutron.context'] = context.Context('',
+ self.tenant_id,
+ is_admin = True)
+ res = net_p_req.get_response(self.ext_api)
+ self.assertEqual(201, res.status_int)
+ net_p = self.deserialize(self.fmt, res)
+ db_session = db.get_session()
+ tenant_id = n1kv_db_v2.get_profile_binding(db_session, self.tenant_id,
+ net_p['network_profile']['id'])
+ tenant1 = n1kv_db_v2.get_profile_binding(db_session, 'tenant1',
+ net_p['network_profile']['id'])
+ tenant2 = n1kv_db_v2.get_profile_binding(db_session, 'tenant2',
+ net_p['network_profile']['id'])
+ self.assertIsNotNone(tenant_id)
+ self.assertIsNotNone(tenant1)
+ self.assertIsNotNone(tenant2)
+ return net_p
+
+ def test_update_network_profile_multi_tenants(self):
+ net_p = self.test_create_network_profile_multi_tenants()
+ data = {'network_profile': {c_const.ADD_TENANTS:
+ ['tenant1', 'tenant3']}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['network_profile']['id'])
+ update_req.environ['neutron.context'] = context.Context('',
+ self.tenant_id,
+ is_admin = True)
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(200, update_res.status_int)
+ db_session = db.get_session()
+ # current tenant_id should always present
+ tenant_id = n1kv_db_v2.get_profile_binding(db_session, self.tenant_id,
+ net_p['network_profile']['id'])
+ tenant1 = n1kv_db_v2.get_profile_binding(db_session, 'tenant1',
+ net_p['network_profile']['id'])
+ self.assertRaises(c_exc.ProfileTenantBindingNotFound,
+ n1kv_db_v2.get_profile_binding,
+ db_session, 'tenant2',
+ net_p['network_profile']['id'])
+ tenant3 = n1kv_db_v2.get_profile_binding(db_session, 'tenant3',
+ net_p['network_profile']['id'])
+ self.assertIsNotNone(tenant_id)
+ self.assertIsNotNone(tenant1)
+ self.assertIsNotNone(tenant3)
+ data = {'network_profile': {c_const.REMOVE_TENANTS: [self.tenant_id,
+ 'tenant1']}}
+ update_req = self.new_update_request('network_profiles',
+ data,
+ net_p['network_profile']['id'])
+ update_req.environ['neutron.context'] = context.Context('',
+ self.tenant_id,
+ is_admin = True)
+ update_res = update_req.get_response(self.ext_api)
+ self.assertEqual(200, update_res.status_int)
+ # current tenant_id should always present
+ tenant_id = n1kv_db_v2.get_profile_binding(db_session, self.tenant_id,
+ net_p['network_profile']['id'])
+ self.assertRaises(c_exc.ProfileTenantBindingNotFound,
+ n1kv_db_v2.get_profile_binding,
+ db_session, 'tenant1',
+ net_p['network_profile']['id'])
+ self.assertRaises(c_exc.ProfileTenantBindingNotFound,
+ n1kv_db_v2.get_profile_binding,
+ db_session, 'tenant2',
+ net_p['network_profile']['id'])
+ tenant3 = n1kv_db_v2.get_profile_binding(db_session, 'tenant3',
+ net_p['network_profile']['id'])
+ self.assertIsNotNone(tenant_id)
+ self.assertIsNotNone(tenant3)
+
class TestN1kvBasicGet(test_plugin.TestBasicGet,
N1kvPluginTestCase):