"extension:provider_network:set": "rule:admin_only",
"extension:router:view": "rule:regular_user",
- "extension:router:set": "rule:admin_only",
"extension:port_binding:view": "rule:admin_only",
"extension:port_binding:set": "rule:admin_only",
"get_network": "rule:admin_or_owner or rule:shared or rule:external",
"create_network:shared": "rule:admin_only",
"create_network:router:external": "rule:admin_only",
+ "create_network:provider:network_type": "rule:admin_only",
+ "create_network:provider:physical_network": "rule:admin_only",
+ "create_network:provider:segmentation_id": "rule:admin_only",
"update_network": "rule:admin_or_owner",
+ "update_network:provider:network_type": "rule:admin_only",
+ "update_network:provider:physical_network": "rule:admin_only",
+ "update_network:provider:segmentation_id": "rule:admin_only",
"delete_network": "rule:admin_or_owner",
"create_port": "",
from quantum.openstack.common import uuidutils
from quantum import policy
-
LOG = logging.getLogger(__name__)
"extension:router:view",
network)
- def _enforce_l3_set_auth(self, context, network):
- return policy.enforce(context,
- "extension:router:set",
- network)
-
def _network_is_external(self, context, net_id):
try:
context.session.query(ExternalNetwork).filter_by(
if not external_set:
return
- self._enforce_l3_set_auth(context, net_data)
-
if external:
# expects to be called within a plugin's session
context.session.add(ExternalNetwork(network_id=net_id))
if not attributes.is_attr_set(new_value):
return
- self._enforce_l3_set_auth(context, net_data)
existing_value = self._network_is_external(context, net_id)
if existing_value == new_value:
NETWORK_TYPE: {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': attributes.ATTR_NOT_SPECIFIED,
+ 'enforce_policy': True,
'is_visible': True},
PHYSICAL_NETWORK: {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
+ 'enforce_policy': True,
'is_visible': True},
SEGMENTATION_ID: {'allow_post': True, 'allow_put': True,
'convert_to': int,
+ 'enforce_policy': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'is_visible': True},
}
return [extensions.ResourceExtension(COLLECTION_NAME,
controller,
attr_map=attr_map)]
+
+ def get_extended_resources(self, version):
+ if version == "2.0":
+ return dict(RESOURCE_ATTRIBUTE_MAP.items())
+ else:
+ return {}
class RoleCheck(Check):
def __call__(self, target, creds):
"""Check that there is a matching role in the cred dict."""
-
return self.match.lower() in [x.lower() for x in creds['roles']]
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
- def _enforce_set_auth(self, context, resource, action):
- policy.enforce(context, action, resource)
-
def _extend_port_dict_binding(self, context, port):
if self._check_view_auth(context, port, self.binding_view):
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
- def _enforce_set_auth(self, context, resource, action):
- policy.enforce(context, action, resource)
-
def _parse_network_vlan_ranges(self):
self._network_vlan_ranges = {}
for entry in cfg.CONF.HYPERV.network_vlan_ranges:
# Provider specific network creation
p.create_network(session, attrs)
- if network_type_set:
- self._enforce_set_auth(context, attrs, self.network_set)
-
def create_network(self, context, network):
session = context.session
with session.begin(subtransactions=True):
def update_network(self, context, id, network):
network_attrs = network['network']
self._check_provider_update(context, network_attrs)
- # Authorize before exposing plugin details to client
- self._enforce_set_auth(context, network_attrs, self.network_set)
session = context.session
with session.begin(subtransactions=True):
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
- def _enforce_set_auth(self, context, resource, action):
- policy.enforce(context, action, resource)
-
def _add_network_vlan_range(self, physical_network, vlan_min, vlan_max):
self._add_network(physical_network)
self.network_vlan_ranges[physical_network].append((vlan_min, vlan_max))
segmentation_id_set):
return (None, None, None)
- # Authorize before exposing plugin details to client
- self._enforce_set_auth(context, attrs, self.network_set)
-
if not network_type_set:
msg = _("provider:network_type required")
raise q_exc.InvalidInput(error_message=msg)
segmentation_id_set):
return
- # Authorize before exposing plugin details to client
- self._enforce_set_auth(context, attrs, self.network_set)
-
msg = _("Plugin does not support updating provider attributes")
raise q_exc.InvalidInput(error_message=msg)
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
- def _enforce_set_auth(self, context, resource, action):
- policy.enforce(context, action, resource)
-
def _update_resource_status(self, context, resource, id, status):
"""Update status of specified resource."""
request = {}
novazone_cluster_map = {}
provider_network_view = "extension:provider_network:view"
- provider_network_set = "extension:provider_network:set"
- port_security_enabled_create = "create_port:port_security_enabled"
port_security_enabled_update = "update_port:port_security_enabled"
def __init__(self, loglevel=None):
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
- def _enforce_set_auth(self, context, resource, action):
- return policy.enforce(context, action, resource)
-
def _handle_provider_create(self, context, attrs):
# NOTE(salvatore-orlando): This method has been borrowed from
# the OpenvSwtich plugin, altough changed to match NVP specifics.
segmentation_id_set):
return
- # Authorize before exposing plugin details to client
- self._enforce_set_auth(context, attrs, self.provider_network_set)
err_msg = None
if not network_type_set:
err_msg = _("%s required") % pnet.NETWORK_TYPE
# pass the value to the policy engine when the port is
# ATTR_NOT_SPECIFIED is for the case where a port is created on a
# shared network that is not owned by the tenant.
- # TODO(arosen) fix policy engine to do this for us automatically.
- if attr.is_attr_set(port['port'].get(psec.PORTSECURITY)):
- self._enforce_set_auth(context, port,
- self.port_security_enabled_create)
port_data = port['port']
notify_dhcp_agent = False
with context.session.begin(subtransactions=True):
return port_data
def update_port(self, context, id, port):
- if attr.is_attr_set(port['port'].get(psec.PORTSECURITY)):
- self._enforce_set_auth(context, port,
- self.port_security_enabled_update)
delete_security_groups = self._check_update_deletes_security_groups(
port)
has_security_groups = self._check_update_has_security_groups(port)
def create_qos_queue(self, context, qos_queue, check_policy=True):
q = qos_queue.get('qos_queue')
- if check_policy:
- self._enforce_set_auth(context, q, ext_qos.qos_queue_create)
self._validate_qos_queue(context, q)
q['id'] = nvplib.create_lqueue(self.cluster,
self._nvp_lqueue(q))
def get_extended_resources(self, version):
if version == "2.0":
- return EXTENDED_ATTRIBUTES_2_0
+ return dict(EXTENDED_ATTRIBUTES_2_0.items() +
+ RESOURCE_ATTRIBUTE_MAP.items())
else:
return {}
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
- def _enforce_set_auth(self, context, resource, action):
- policy.enforce(context, action, resource)
-
def _extend_network_dict_provider(self, context, network):
if self._check_view_auth(context, network, self.network_view):
binding = ovs_db_v2.get_network_binding(context.session,
segmentation_id_set):
return (None, None, None)
- # Authorize before exposing plugin details to client
- self._enforce_set_auth(context, attrs, self.network_set)
-
if not network_type_set:
msg = _("provider:network_type required")
raise q_exc.InvalidInput(error_message=msg)
segmentation_id_set):
return
- # Authorize before exposing plugin details to client
- self._enforce_set_auth(context, attrs, self.network_set)
-
msg = _("Plugin does not support updating provider attributes")
raise q_exc.InvalidInput(error_message=msg)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Salvatore Orlando, VMware
+#
+
+import mock
+from oslo.config import cfg
+from webob import exc as web_exc
+import webtest
+
+from quantum.api import extensions
+from quantum.api.v2 import attributes
+from quantum.api.v2 import router
+from quantum import context
+from quantum.extensions import providernet as pnet
+from quantum.manager import QuantumManager
+from quantum.openstack.common import uuidutils
+from quantum.tests.unit import test_api_v2
+from quantum.tests.unit import test_extensions
+from quantum.tests.unit import testlib_api
+
+
+class ProviderExtensionManager(object):
+
+ def get_resources(self):
+ return []
+
+ def get_actions(self):
+ return []
+
+ def get_request_extensions(self):
+ return []
+
+ def get_extended_resources(self, version):
+ return pnet.get_extended_resources(version)
+
+
+class ProvidernetExtensionTestCase(testlib_api.WebTestCase):
+ fmt = 'json'
+
+ def setUp(self):
+ super(ProvidernetExtensionTestCase, self).setUp()
+
+ plugin = 'quantum.quantum_plugin_base_v2.QuantumPluginBaseV2'
+ # Ensure 'stale' patched copies of the plugin are never returned
+ QuantumManager._instance = None
+
+ # Ensure existing ExtensionManager is not used
+ extensions.PluginAwareExtensionManager._instance = None
+
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
+
+ # Update the plugin and extensions path
+ cfg.CONF.set_override('core_plugin', plugin)
+ cfg.CONF.set_override('allow_pagination', True)
+ cfg.CONF.set_override('allow_sorting', True)
+ cfg.CONF.set_override('quota_network', -1, group='QUOTAS')
+ self._plugin_patcher = mock.patch(plugin, autospec=True)
+ self.plugin = self._plugin_patcher.start()
+ # Instantiate mock plugin and enable the 'provider' extension
+ QuantumManager.get_plugin().supported_extension_aliases = (
+ ["provider"])
+ ext_mgr = ProviderExtensionManager()
+ self.ext_mdw = test_extensions.setup_extensions_middleware(ext_mgr)
+ self.addCleanup(self._plugin_patcher.stop)
+ self.addCleanup(cfg.CONF.reset)
+ self.addCleanup(self._restore_attribute_map)
+ self.api = webtest.TestApp(router.APIRouter())
+
+ def _restore_attribute_map(self):
+ # Restore the global RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+
+ def _prepare_net_data(self):
+ return {'name': 'net1',
+ pnet.NETWORK_TYPE: 'sometype',
+ pnet.PHYSICAL_NETWORK: 'physnet',
+ pnet.SEGMENTATION_ID: 666}
+
+ def _put_network_with_provider_attrs(self, ctx, expect_errors=False):
+ data = self._prepare_net_data()
+ env = {'quantum.context': ctx}
+ instance = self.plugin.return_value
+ instance.get_network.return_value = {'tenant_id': ctx.tenant_id,
+ 'shared': False}
+ net_id = uuidutils.generate_uuid()
+ res = self.api.put(test_api_v2._get_path('networks',
+ id=net_id,
+ fmt=self.fmt),
+ self.serialize({'network': data}),
+ extra_environ=env,
+ expect_errors=expect_errors)
+ return res, data, net_id
+
+ def _post_network_with_provider_attrs(self, ctx, expect_errors=False):
+ data = self._prepare_net_data()
+ env = {'quantum.context': ctx}
+ res = self.api.post(test_api_v2._get_path('networks', fmt=self.fmt),
+ self.serialize({'network': data}),
+ content_type='application/' + self.fmt,
+ extra_environ=env,
+ expect_errors=expect_errors)
+ return res, data
+
+ def test_network_create_with_provider_attrs(self):
+ ctx = context.get_admin_context()
+ ctx.tenant_id = 'an_admin'
+ res, data = self._post_network_with_provider_attrs(ctx)
+ instance = self.plugin.return_value
+ exp_input = {'network': data}
+ exp_input['network'].update({'admin_state_up': True,
+ 'tenant_id': 'an_admin',
+ 'shared': False})
+ instance.create_network.assert_called_with(mock.ANY,
+ network=exp_input)
+ self.assertEqual(res.status_int, web_exc.HTTPCreated.code)
+
+ def test_network_update_with_provider_attrs(self):
+ ctx = context.get_admin_context()
+ ctx.tenant_id = 'an_admin'
+ res, data, net_id = self._put_network_with_provider_attrs(ctx)
+ instance = self.plugin.return_value
+ exp_input = {'network': data}
+ instance.update_network.assert_called_with(mock.ANY,
+ net_id,
+ network=exp_input)
+ self.assertEqual(res.status_int, web_exc.HTTPOk.code)
+
+ def test_network_create_with_provider_attrs_noadmin_returns_403(self):
+ tenant_id = 'no_admin'
+ ctx = context.Context('', tenant_id, is_admin=False)
+ res, _1 = self._post_network_with_provider_attrs(ctx, True)
+ self.assertEqual(res.status_int, web_exc.HTTPForbidden.code)
+
+ def test_network_update_with_provider_attrs_noadmin_returns_404(self):
+ tenant_id = 'no_admin'
+ ctx = context.Context('', tenant_id, is_admin=False)
+ res, _1, _2 = self._put_network_with_provider_attrs(ctx, True)
+ self.assertEqual(res.status_int, web_exc.HTTPNotFound.code)