from neutron import manager
from neutron.plugins.cisco.common import cisco_constants as c_const
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
+from neutron.plugins.cisco.common import config as c_conf
from neutron.plugins.cisco.db import n1kv_db_v2
from neutron.plugins.cisco.db import n1kv_models_v2
from neutron.plugins.cisco.db import network_db_v2 as cdb
from neutron.plugins.cisco import extensions
from neutron.plugins.cisco.extensions import n1kv
from neutron.plugins.cisco.extensions import network_profile
+from neutron.plugins.cisco.extensions import policy_profile
from neutron.plugins.cisco.n1kv import n1kv_client
from neutron.plugins.cisco.n1kv import n1kv_neutron_plugin
from neutron.tests.unit import _test_extension_portbindings as test_bindings
return []
+class PolicyProfileTestExtensionManager(object):
+
+ def get_resources(self):
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ policy_profile.RESOURCE_ATTRIBUTE_MAP)
+ return policy_profile.Policy_profile.get_resources()
+
+ def get_actions(self):
+ return []
+
+ def get_request_extensions(self):
+ return []
+
+
class N1kvPluginTestCase(test_plugin.NeutronDbPluginV2TestCase):
_plugin_name = ('neutron.plugins.cisco.n1kv.'
n1kv_db_v2.sync_vlan_allocations(db_session, net_p)
return net_p
- def setUp(self):
+ def setUp(self, ext_mgr=NetworkProfileTestExtensionManager()):
"""
Setup method for n1kv plugin tests.
n1kv_neutron_plugin.N1kvNeutronPluginV2._setup_vsm = _fake_setup_vsm
neutron_extensions.append_api_extensions_path(extensions.__path__)
- ext_mgr = NetworkProfileTestExtensionManager()
# Save the original RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
class TestN1kvPolicyProfiles(N1kvPluginTestCase):
+ def setUp(self):
+ """
+ Setup function for policy profile tests.
+
+ We need to use the policy profile extension manager for these
+ test cases, so call the super class setup, but pass in the
+ policy profile extension manager.
+ """
+ super(TestN1kvPolicyProfiles, self).setUp(
+ ext_mgr=PolicyProfileTestExtensionManager())
+
def test_populate_policy_profile(self):
client_patch = mock.patch(n1kv_client.__name__ + ".Client",
new=fake_client.TestClient)
db_session,
'00000000-0000-0000-0000-000000000003')
+ def _init_get_policy_profiles(self):
+ # Get the profiles
+ mock.patch(n1kv_client.__name__ + ".Client",
+ new=fake_client.TestClient).start()
+ instance = n1kv_neutron_plugin.N1kvNeutronPluginV2()
+ instance._populate_policy_profiles()
+ db_session = db.get_session()
+ return [
+ n1kv_db_v2.get_policy_profile(
+ db_session, '00000000-0000-0000-0000-000000000001'),
+ n1kv_db_v2.get_policy_profile(
+ db_session, '00000000-0000-0000-0000-000000000002')
+ ]
+
+ def _test_get_policy_profiles(self, expected_profiles, admin):
+ resource = 'policy_profiles'
+ if admin:
+ ctx = context.Context(user_id='admin',
+ tenant_id='tenant1',
+ is_admin=True)
+ else:
+ ctx = context.Context(user_id='non_admin',
+ tenant_id='tenant1',
+ is_admin=False)
+ res = self._list(resource, neutron_context=ctx)
+ self.assertEqual(len(expected_profiles), len(res[resource]))
+ profiles = sorted(res[resource])
+ for i in range(len(profiles)):
+ self.assertEqual(expected_profiles[i].id,
+ profiles[i]['id'])
+ self.assertEqual(expected_profiles[i].name,
+ profiles[i]['name'])
+
+ def test_get_profiles_unrestricted(self):
+ """
+ Test unrestricted policy profile retrieval.
+
+ Test getting policy profiles using the normal unrestricted
+ behavior. We set the flag and attempt to retrieve the port
+ profiles. It should work for both admin and non-admin.
+ """
+ # Get the profiles
+ profiles = self._init_get_policy_profiles()
+ # Set the restriction flag
+ c_conf.CONF.set_override('restrict_policy_profiles', False,
+ 'CISCO_N1K')
+ # Request the list using non-admin and verify it returns
+ self._test_get_policy_profiles(expected_profiles=profiles, admin=False)
+ # Request the list using admin and verify it returns
+ self._test_get_policy_profiles(expected_profiles=profiles, admin=True)
+
+ def test_get_profiles_restricted(self):
+ """
+ Test restricted policy profile retrieval.
+
+ Test getting policy profiles using the restricted behavior.
+ We set the flag and attempt to retrieve the port profiles. It
+ should work for admin and fail for non-admin.
+ """
+ # Get the profiles
+ profiles = self._init_get_policy_profiles()
+ # Set the restriction flag
+ c_conf.CONF.set_override('restrict_policy_profiles', True,
+ 'CISCO_N1K')
+ # Request the list using non-admin and verify it returns no data
+ self._test_get_policy_profiles(expected_profiles=[], admin=False)
+ # Request the list using admin and verify it returns
+ self._test_get_policy_profiles(expected_profiles=profiles, admin=True)
+
class TestN1kvNetworks(test_plugin.TestNetworksV2,
N1kvPluginTestCase):