raise c_exc.NetworkProfileNotFound(profile=id)
-def _get_network_profiles(**kwargs):
+def _get_network_profiles(db_session=None, physical_network=None):
"""
Retrieve all network profiles.
network is specified. If no physical network is specified, return
all network profiles.
"""
- db_session = db.get_session()
- if "physical_network" in kwargs:
+ db_session = db_session or db.get_session()
+ if physical_network:
return (db_session.query(n1kv_models_v2.NetworkProfile).
- filter_by(physical_network=kwargs["physical_network"]))
+ filter_by(physical_network=physical_network))
return db_session.query(n1kv_models_v2.NetworkProfile)
raise c_exc.PolicyProfileIdNotFound(profile_id=id)
-def create_profile_binding(tenant_id, profile_id, profile_type):
+def create_profile_binding(db_session, tenant_id, profile_id, profile_type):
"""Create Network/Policy Profile association with a tenant."""
+ db_session = db_session or db.get_session()
if profile_type not in ["network", "policy"]:
raise q_exc.NeutronException(_("Invalid profile type"))
- if _profile_binding_exists(tenant_id, profile_id, profile_type):
- return get_profile_binding(tenant_id, profile_id)
+ if _profile_binding_exists(db_session,
+ tenant_id,
+ profile_id,
+ profile_type):
+ return get_profile_binding(db_session, tenant_id, profile_id)
- db_session = db.get_session()
with db_session.begin(subtransactions=True):
binding = n1kv_models_v2.ProfileBinding(profile_type=profile_type,
profile_id=profile_id,
return binding
-def _profile_binding_exists(tenant_id, profile_id, profile_type):
- db_session = db.get_session()
+def _profile_binding_exists(db_session, tenant_id, profile_id, profile_type):
LOG.debug(_("_profile_binding_exists()"))
return (db_session.query(n1kv_models_v2.ProfileBinding).
filter_by(tenant_id=tenant_id, profile_id=profile_id,
profile_type=profile_type).first())
-def _get_profile_binding(tenant_id, profile_id):
- LOG.debug(_("_get_profile_binding"))
- db_session = db.get_session()
- return (db_session.query(n1kv_models_v2.ProfileBinding).filter_by(
- tenant_id=tenant_id, profile_id=profile_id).one())
-
-
-def get_profile_binding(tenant_id, profile_id):
+def get_profile_binding(db_session, tenant_id, profile_id):
"""Get Network/Policy Profile - Tenant binding."""
LOG.debug(_("get_profile_binding()"))
try:
- return _get_profile_binding(tenant_id, profile_id)
+ return (db_session.query(n1kv_models_v2.ProfileBinding).filter_by(
+ tenant_id=tenant_id, profile_id=profile_id).one())
except exc.NoResultFound:
c_exc.ProfileTenantBindingNotFound(profile_id=profile_id)
-def delete_profile_binding(tenant_id, profile_id):
+def delete_profile_binding(db_session, tenant_id, profile_id):
"""Delete Policy Binding."""
LOG.debug(_("delete_profile_binding()"))
- db_session = db.get_session()
+ db_session = db_session or db.get_session()
try:
- binding = get_profile_binding(tenant_id, profile_id)
+ binding = get_profile_binding(db_session, tenant_id, profile_id)
with db_session.begin(subtransactions=True):
db_session.delete(binding)
except c_exc.ProfileTenantBindingNotFound:
return
-def _get_profile_bindings(profile_type=None):
+def _get_profile_bindings(db_session, profile_type=None):
"""
Retrieve a list of profile bindings.
profile types.
"""
LOG.debug(_("_get_profile_bindings()"))
- db_session = db.get_session()
if profile_type:
profile_bindings = (db_session.query(n1kv_models_v2.ProfileBinding).
filter_by(profile_type=profile_type))
"""
if context.is_admin:
profile_bindings = _get_profile_bindings(
+ context.session,
profile_type=c_const.NETWORK)
return [self._make_profile_bindings_dict(pb)
for pb in profile_bindings]
p = network_profile["network_profile"]
self._validate_network_profile_args(context, p)
net_profile = create_network_profile(context.session, p)
- create_profile_binding(context.tenant_id,
+ create_profile_binding(context.session,
+ context.tenant_id,
net_profile.id,
c_const.NETWORK)
if p.get("add_tenant"):
- self.add_network_profile_tenant(net_profile.id, p["add_tenant"])
+ self.add_network_profile_tenant(context.session,
+ net_profile.id,
+ p["add_tenant"])
return self._make_network_profile_dict(net_profile)
def delete_network_profile(self, context, id):
"""
p = network_profile["network_profile"]
if context.is_admin and "add_tenant" in p:
- self.add_network_profile_tenant(id, p["add_tenant"])
+ self.add_network_profile_tenant(context.session,
+ id,
+ p["add_tenant"])
return self._make_network_profile_dict(get_network_profile(
context.session, id))
if context.is_admin and "remove_tenant" in p:
- delete_profile_binding(p["remove_tenant"], id)
+ delete_profile_binding(context.session, p["remove_tenant"], id)
return self._make_network_profile_dict(get_network_profile(
context.session, id))
return self._make_network_profile_dict(
NetworkProfile,
context.tenant_id)
- def add_network_profile_tenant(self, network_profile_id, tenant_id):
+ def add_network_profile_tenant(self,
+ db_session,
+ network_profile_id,
+ tenant_id):
"""
Add a tenant to a network profile.
+ :param db_session: database session
:param network_profile_id: UUID representing network profile
:param tenant_id: UUID representing the tenant
:returns: profile binding object
"""
- return create_profile_binding(tenant_id,
+ return create_profile_binding(db_session,
+ tenant_id,
network_profile_id,
c_const.NETWORK)
segment_type = net_p["segment_type"].lower()
if segment_type == c_const.NETWORK_TYPE_VLAN:
profiles = _get_network_profiles(
- physical_network=net_p["physical_network"])
+ db_session=context.session,
+ physical_network=net_p["physical_network"]
+ )
else:
- profiles = _get_network_profiles()
+ profiles = _get_network_profiles(db_session=context.session)
if profiles:
for profile in profiles:
name = profile.name
"""
if context.is_admin:
profile_bindings = _get_profile_bindings(
+ context.session,
profile_type=c_const.POLICY)
return [self._make_profile_bindings_dict(pb)
for pb in profile_bindings]
"""
p = policy_profile["policy_profile"]
if context.is_admin and "add_tenant" in p:
- self.add_policy_profile_tenant(id, p["add_tenant"])
+ self.add_policy_profile_tenant(context.session,
+ id,
+ p["add_tenant"])
return self._make_policy_profile_dict(get_policy_profile(
context.session, id))
if context.is_admin and "remove_tenant" in p:
- delete_profile_binding(p["remove_tenant"], id)
+ delete_profile_binding(context.session, p["remove_tenant"], id)
return self._make_policy_profile_dict(get_policy_profile(
context.session, id))
return self._make_policy_profile_dict(
update_policy_profile(context.session, id, p))
- def add_policy_profile_tenant(self, policy_profile_id, tenant_id):
+ def add_policy_profile_tenant(self,
+ db_session,
+ policy_profile_id,
+ tenant_id):
"""
Add a tenant to a policy profile binding.
+ :param db_session: database session
:param policy_profile_id: UUID representing policy profile
:param tenant_id: UUID representing the tenant
:returns: profile binding object
"""
- return create_profile_binding(tenant_id,
+ return create_profile_binding(db_session,
+ tenant_id,
policy_profile_id,
c_const.POLICY)
:param policy_profile_id: UUID representing policy profile
:param tenant_id: UUID representing the tenant
"""
- delete_profile_binding(tenant_id, policy_profile_id)
+ delete_profile_binding(None, tenant_id, policy_profile_id)
def _delete_policy_profile(self, policy_profile_id):
"""Delete policy profile and associated binding."""
tenant_id = tenant_id or c_const.TENANT_ID_NOT_SET
if not self._policy_profile_exists(policy_profile_id):
create_policy_profile(policy_profile)
- create_profile_binding(tenant_id, policy_profile["id"], c_const.POLICY)
+ create_profile_binding(None,
+ tenant_id,
+ policy_profile["id"],
+ c_const.POLICY)
db.configure_db()
self.session = db.get_session()
n1kv_db_v2.sync_vlan_allocations(self.session, VLAN_RANGES)
-
- def tearDown(self):
- super(VlanAllocationsTest, self).tearDown()
+ self.addCleanup(db.clear_db)
def test_sync_vlan_allocations_outside_segment_range(self):
self.assertRaises(c_exc.VlanIDNotFound,
db.configure_db()
self.session = db.get_session()
n1kv_db_v2.sync_vxlan_allocations(self.session, VXLAN_RANGES)
-
- def tearDown(self):
- super(VxlanAllocationsTest, self).tearDown()
+ self.addCleanup(db.clear_db)
def test_sync_vxlan_allocations_outside_segment_range(self):
self.assertIsNone(n1kv_db_v2.get_vxlan_allocation(self.session,
super(NetworkBindingsTest, self).setUp()
db.configure_db()
self.session = db.get_session()
-
- def tearDown(self):
- super(NetworkBindingsTest, self).tearDown()
+ self.addCleanup(db.clear_db)
def test_add_network_binding(self):
with self.network() as network:
super(NetworkProfileTests, self).setUp()
db.configure_db()
self.session = db.get_session()
-
- def tearDown(self):
- super(NetworkProfileTests, self).tearDown()
+ self.addCleanup(db.clear_db)
def test_create_network_profile(self):
_db_profile = n1kv_db_v2.create_network_profile(self.session,
[n1kv_db_v2.create_network_profile(self.session, p)
for p in test_profiles]
# TODO(abhraut): Fix this test to work with real tenant_td
- profiles = n1kv_db_v2._get_network_profiles()
+ profiles = n1kv_db_v2._get_network_profiles(db_session=self.session)
self.assertEqual(len(test_profiles), len(list(profiles)))
super(PolicyProfileTests, self).setUp()
db.configure_db()
self.session = db.get_session()
-
- def tearDown(self):
- super(PolicyProfileTests, self).tearDown()
+ self.addCleanup(db.clear_db)
def test_create_policy_profile(self):
_db_profile = n1kv_db_v2.create_policy_profile(TEST_POLICY_PROFILE)
super(ProfileBindingTests, self).setUp()
db.configure_db()
self.session = db.get_session()
-
- def tearDown(self):
- super(ProfileBindingTests, self).tearDown()
+ self.addCleanup(db.clear_db)
def _create_test_binding_if_not_there(self, tenant_id, profile_id,
profile_type):
tenant_id=tenant_id,
profile_id=profile_id).one())
except s_exc.NoResultFound:
- _binding = n1kv_db_v2.create_profile_binding(tenant_id,
+ _binding = n1kv_db_v2.create_profile_binding(self.session,
+ tenant_id,
profile_id,
profile_type)
return _binding
test_tenant_id = "d434dd90-76ec-11e2-bcfd-0800200c9a66"
test_profile_id = "dd7b9741-76ec-11e2-bcfd-0800200c9a66"
test_profile_type = "network"
- n1kv_db_v2.create_profile_binding(test_tenant_id, test_profile_id,
+ n1kv_db_v2.create_profile_binding(self.session,
+ test_tenant_id,
+ test_profile_id,
test_profile_type)
try:
self.session.query(n1kv_models_v2.ProfileBinding).filter_by(
self._create_test_binding_if_not_there(test_tenant_id,
test_profile_id,
test_profile_type)
- binding = n1kv_db_v2.get_profile_binding(test_tenant_id,
+ binding = n1kv_db_v2.get_profile_binding(self.session,
+ test_tenant_id,
test_profile_id)
self.assertEqual(binding.tenant_id, test_tenant_id)
self.assertEqual(binding.profile_id, test_profile_id)
self._create_test_binding_if_not_there(test_tenant_id,
test_profile_id,
test_profile_type)
- n1kv_db_v2.delete_profile_binding(test_tenant_id, test_profile_id)
+ n1kv_db_v2.delete_profile_binding(self.session,
+ test_tenant_id,
+ test_profile_id)
q = (self.session.query(n1kv_models_v2.ProfileBinding).filter_by(
profile_type=test_profile_type,
tenant_id=test_tenant_id,
ctx.tenant_id = "d434dd90-76ec-11e2-bcfd-0800200c9a66"
test_profile_id = "AAAAAAAA-76ec-11e2-bcfd-0800200c9a66"
test_profile_type = "policy"
- n1kv_db_v2.create_profile_binding(cisco_constants.TENANT_ID_NOT_SET,
+ n1kv_db_v2.create_profile_binding(self.session,
+ cisco_constants.TENANT_ID_NOT_SET,
test_profile_id,
test_profile_type)
network_profile = {"network_profile": TEST_NETWORK_PROFILE}
test_network_profile = self.create_network_profile(ctx,
network_profile)
- binding = n1kv_db_v2.get_profile_binding(ctx.tenant_id,
+ binding = n1kv_db_v2.get_profile_binding(self.session,
+ ctx.tenant_id,
test_profile_id)
self.assertIsNone(n1kv_db_v2.get_profile_binding(
+ self.session,
cisco_constants.TENANT_ID_NOT_SET,
test_profile_id))
self.assertNotEqual(binding.tenant_id,