from neutron.extensions import portbindings
from neutron.i18n import _LW
from neutron import manager
+from neutron.plugins.common import utils as p_utils
from neutron.quota import resource_registry
+
LOG = logging.getLogger(__name__)
"""Perform port operations taking care of concurrency issues."""
try:
if action == 'create_port':
- return plugin.create_port(context, port)
+ return p_utils.create_port(plugin, context, port)
elif action == 'update_port':
return plugin.update_port(context, port['id'], port)
else:
from oslo_log import log as logging
from oslo_utils import uuidutils
import six
+import webob.exc
from neutron.common import constants
from neutron.common import exceptions as n_exc
'allocation_pools': 'allocation_pool',
'fixed_ips': 'fixed_ip',
'extensions': 'extension'}
+
+
+def fill_default_value(attr_info, res_dict,
+ exc_cls=ValueError,
+ check_allow_post=True):
+ for attr, attr_vals in six.iteritems(attr_info):
+ if attr_vals['allow_post']:
+ if ('default' not in attr_vals and
+ attr not in res_dict):
+ msg = _("Failed to parse request. Required "
+ "attribute '%s' not specified") % attr
+ raise exc_cls(msg)
+ res_dict[attr] = res_dict.get(attr,
+ attr_vals.get('default'))
+ elif check_allow_post:
+ if attr in res_dict:
+ msg = _("Attribute '%s' not allowed in POST") % attr
+ raise exc_cls(msg)
+
+
+def convert_value(attr_info, res_dict, exc_cls=ValueError):
+ for attr, attr_vals in six.iteritems(attr_info):
+ if (attr not in res_dict or
+ res_dict[attr] is ATTR_NOT_SPECIFIED):
+ continue
+ # Convert values if necessary
+ if 'convert_to' in attr_vals:
+ res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
+ # Check that configured values are correct
+ if 'validate' not in attr_vals:
+ continue
+ for rule in attr_vals['validate']:
+ res = validators[rule](res_dict[attr], attr_vals['validate'][rule])
+ if res:
+ msg_dict = dict(attr=attr, reason=res)
+ msg = _("Invalid input for %(attr)s. "
+ "Reason: %(reason)s.") % msg_dict
+ raise exc_cls(msg)
+
+
+def populate_tenant_id(context, res_dict, attr_info, is_create):
+ if (('tenant_id' in res_dict and
+ res_dict['tenant_id'] != context.tenant_id and
+ not context.is_admin)):
+ msg = _("Specifying 'tenant_id' other than authenticated "
+ "tenant in request requires admin privileges")
+ raise webob.exc.HTTPBadRequest(msg)
+
+ if is_create and 'tenant_id' not in res_dict:
+ if context.tenant_id:
+ res_dict['tenant_id'] = context.tenant_id
+ elif 'tenant_id' in attr_info:
+ msg = _("Running without keystone AuthN requires "
+ "that tenant_id is specified")
+ raise webob.exc.HTTPBadRequest(msg)
+
+
+def verify_attributes(res_dict, attr_info):
+ extra_keys = set(res_dict.keys()) - set(attr_info.keys())
+ if extra_keys:
+ msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
+ raise webob.exc.HTTPBadRequest(msg)
self._send_nova_notification(action, orig_object_copy, result)
return result
- @staticmethod
- def _populate_tenant_id(context, res_dict, attr_info, is_create):
- if (('tenant_id' in res_dict and
- res_dict['tenant_id'] != context.tenant_id and
- not context.is_admin)):
- msg = _("Specifying 'tenant_id' other than authenticated "
- "tenant in request requires admin privileges")
- raise webob.exc.HTTPBadRequest(msg)
-
- if is_create and 'tenant_id' not in res_dict:
- if context.tenant_id:
- res_dict['tenant_id'] = context.tenant_id
- elif 'tenant_id' in attr_info:
- msg = _("Running without keystone AuthN requires "
- "that tenant_id is specified")
- raise webob.exc.HTTPBadRequest(msg)
-
@staticmethod
def prepare_request_body(context, body, is_create, resource, attr_info,
allow_bulk=False):
msg = _("Unable to find '%s' in request body") % resource
raise webob.exc.HTTPBadRequest(msg)
- Controller._populate_tenant_id(context, res_dict, attr_info, is_create)
- Controller._verify_attributes(res_dict, attr_info)
+ attributes.populate_tenant_id(context, res_dict, attr_info, is_create)
+ attributes.verify_attributes(res_dict, attr_info)
if is_create: # POST
- for attr, attr_vals in six.iteritems(attr_info):
- if attr_vals['allow_post']:
- if ('default' not in attr_vals and
- attr not in res_dict):
- msg = _("Failed to parse request. Required "
- "attribute '%s' not specified") % attr
- raise webob.exc.HTTPBadRequest(msg)
- res_dict[attr] = res_dict.get(attr,
- attr_vals.get('default'))
- else:
- if attr in res_dict:
- msg = _("Attribute '%s' not allowed in POST") % attr
- raise webob.exc.HTTPBadRequest(msg)
+ attributes.fill_default_value(attr_info, res_dict,
+ webob.exc.HTTPBadRequest)
else: # PUT
for attr, attr_vals in six.iteritems(attr_info):
if attr in res_dict and not attr_vals['allow_put']:
msg = _("Cannot update read-only attribute %s") % attr
raise webob.exc.HTTPBadRequest(msg)
- for attr, attr_vals in six.iteritems(attr_info):
- if (attr not in res_dict or
- res_dict[attr] is attributes.ATTR_NOT_SPECIFIED):
- continue
- # Convert values if necessary
- if 'convert_to' in attr_vals:
- res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
- # Check that configured values are correct
- if 'validate' not in attr_vals:
- continue
- for rule in attr_vals['validate']:
- res = attributes.validators[rule](res_dict[attr],
- attr_vals['validate'][rule])
- if res:
- msg_dict = dict(attr=attr, reason=res)
- msg = _("Invalid input for %(attr)s. "
- "Reason: %(reason)s.") % msg_dict
- raise webob.exc.HTTPBadRequest(msg)
+ attributes.convert_value(attr_info, res_dict, webob.exc.HTTPBadRequest)
return body
- @staticmethod
- def _verify_attributes(res_dict, attr_info):
- extra_keys = set(res_dict.keys()) - set(attr_info.keys())
- if extra_keys:
- msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
- raise webob.exc.HTTPBadRequest(msg)
-
def _validate_network_tenant_ownership(self, request, resource_item):
# TODO(salvatore-orlando): consider whether this check can be folded
# in the policy engine
from neutron.i18n import _LI, _LE
from neutron import manager
from neutron.plugins.common import constants
+from neutron.plugins.common import utils as p_utils
LOG = logging.getLogger(__name__)
def _create_router_gw_port(self, context, router, network_id, ext_ips):
# Port has no 'tenant-id', as it is hidden from user
- gw_port = self._core_plugin.create_port(context.elevated(), {
- 'port': {'tenant_id': '', # intentionally not set
+ port_data = {'tenant_id': '', # intentionally not set
'network_id': network_id,
- 'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': ext_ips or attributes.ATTR_NOT_SPECIFIED,
'device_id': router['id'],
'device_owner': DEVICE_OWNER_ROUTER_GW,
'admin_state_up': True,
- 'name': ''}})
+ 'name': ''}
+ gw_port = p_utils.create_port(self._core_plugin,
+ context.elevated(), {'port': port_data})
if not gw_port['fixed_ips']:
LOG.debug('No IPs available for external network %s',
port['port_id'], {'port':
{'fixed_ips': fixed_ips}}), [subnet], False
- return self._core_plugin.create_port(context, {
- 'port':
- {'tenant_id': subnet['tenant_id'],
- 'network_id': subnet['network_id'],
- 'fixed_ips': [fixed_ip],
- 'mac_address': attributes.ATTR_NOT_SPECIFIED,
- 'admin_state_up': True,
- 'device_id': router.id,
- 'device_owner': owner,
- 'name': ''}}), [subnet], True
+ port_data = {'tenant_id': subnet['tenant_id'],
+ 'network_id': subnet['network_id'],
+ 'fixed_ips': [fixed_ip],
+ 'admin_state_up': True,
+ 'device_id': router.id,
+ 'device_owner': owner,
+ 'name': ''}
+ return p_utils.create_port(self._core_plugin, context,
+ {'port': port_data}), [subnet], True
@staticmethod
def _make_router_interface_info(
port = {'tenant_id': '', # tenant intentionally not set
'network_id': f_net_id,
- 'mac_address': attributes.ATTR_NOT_SPECIFIED,
- 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': fip_id,
'device_owner': DEVICE_OWNER_FLOATINGIP,
'status': l3_constants.PORT_STATUS_NOTAPPLICABLE,
'name': ''}
-
if fip.get('floating_ip_address'):
port['fixed_ips'] = [
{'ip_address': fip['floating_ip_address']}]
if fip.get('subnet_id'):
port['fixed_ips'] = [
{'subnet_id': fip['subnet_id']}]
- external_port = self._core_plugin.create_port(context.elevated(),
- {'port': port})
+ # 'status' in port dict could not be updated by default, use
+ # check_allow_post to stop the verification of system
+ external_port = p_utils.create_port(self._core_plugin,
+ context.elevated(),
+ {'port': port},
+ check_allow_post=False)
# Ensure IPv4 addresses are allocated on external port
external_ipv4_ips = self._port_ipv4_fixed_ips(external_port)
if not external_ipv4_ips:
from neutron.i18n import _LI
from neutron import manager
from neutron.plugins.common import constants
+from neutron.plugins.common import utils as p_utils
LOG = logging.getLogger(__name__)
if not f_port:
LOG.info(_LI('Agent Gateway port does not exist,'
' so create one: %s'), f_port)
- agent_port = self._core_plugin.create_port(
- context,
- {'port': {'tenant_id': '',
- 'network_id': network_id,
- 'mac_address': attributes.ATTR_NOT_SPECIFIED,
- 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
- 'device_id': l3_agent_db['id'],
- 'device_owner': DEVICE_OWNER_AGENT_GW,
- 'binding:host_id': host,
- 'admin_state_up': True,
- 'name': ''}})
+ port_data = {'tenant_id': '',
+ 'network_id': network_id,
+ 'device_id': l3_agent_db['id'],
+ 'device_owner': DEVICE_OWNER_AGENT_GW,
+ 'binding:host_id': host,
+ 'admin_state_up': True,
+ 'name': ''}
+ agent_port = p_utils.create_port(self._core_plugin, context,
+ {'port': port_data})
if agent_port:
self._populate_subnets_for_ports(context, [agent_port])
return agent_port
def _add_csnat_router_interface_port(
self, context, router, network_id, subnet_id, do_pop=True):
"""Add SNAT interface to the specified router and subnet."""
- snat_port = self._core_plugin.create_port(
- context,
- {'port': {'tenant_id': '',
- 'network_id': network_id,
- 'mac_address': attributes.ATTR_NOT_SPECIFIED,
- 'fixed_ips': [{'subnet_id': subnet_id}],
- 'device_id': router.id,
- 'device_owner': DEVICE_OWNER_DVR_SNAT,
- 'admin_state_up': True,
- 'name': ''}})
+ port_data = {'tenant_id': '',
+ 'network_id': network_id,
+ 'fixed_ips': [{'subnet_id': subnet_id}],
+ 'device_id': router.id,
+ 'device_owner': DEVICE_OWNER_DVR_SNAT,
+ 'admin_state_up': True,
+ 'name': ''}
+ snat_port = p_utils.create_port(self._core_plugin, context,
+ {'port': port_data})
if not snat_port:
msg = _("Unable to create the SNAT Interface Port")
raise n_exc.BadRequest(resource='router', msg=msg)
from neutron.extensions import portbindings
from neutron.extensions import providernet
from neutron.i18n import _LI
+from neutron.plugins.common import utils as p_utils
+
VR_ID_RANGE = set(range(1, 255))
MAX_ALLOCATION_TRIES = 10
context, ha_network.network_id, router.id)
def _create_ha_subnet(self, context, network_id, tenant_id):
- args = {'subnet':
- {'network_id': network_id,
- 'tenant_id': '',
- 'name': constants.HA_SUBNET_NAME % tenant_id,
- 'ip_version': 4,
- 'cidr': cfg.CONF.l3_ha_net_cidr,
- 'enable_dhcp': False,
- 'host_routes': attributes.ATTR_NOT_SPECIFIED,
- 'dns_nameservers': attributes.ATTR_NOT_SPECIFIED,
- 'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
- 'gateway_ip': None}}
- return self._core_plugin.create_subnet(context, args)
+ args = {'network_id': network_id,
+ 'tenant_id': '',
+ 'name': constants.HA_SUBNET_NAME % tenant_id,
+ 'ip_version': 4,
+ 'cidr': cfg.CONF.l3_ha_net_cidr,
+ 'enable_dhcp': False,
+ 'gateway_ip': None}
+ return p_utils.create_subnet(self._core_plugin, context,
+ {'subnet': args})
def _create_ha_network_tenant_binding(self, context, tenant_id,
network_id):
{'name': constants.HA_NETWORK_NAME % tenant_id,
'tenant_id': '',
'shared': False,
- 'admin_state_up': True,
- 'status': constants.NET_STATUS_ACTIVE}}
+ 'admin_state_up': True}}
self._add_ha_network_settings(args['network'])
+ network = p_utils.create_network(self._core_plugin, admin_ctx, args)
- network = self._core_plugin.create_network(admin_ctx, args)
try:
ha_network = self._create_ha_network_tenant_binding(admin_ctx,
tenant_id,
return portbinding
def add_ha_port(self, context, router_id, network_id, tenant_id):
- port = self._core_plugin.create_port(context, {
- 'port':
- {'tenant_id': '',
- 'network_id': network_id,
- 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
- 'mac_address': attributes.ATTR_NOT_SPECIFIED,
- 'admin_state_up': True,
- 'device_id': router_id,
- 'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
- 'name': constants.HA_PORT_NAME % tenant_id}})
+ args = {'tenant_id': '',
+ 'network_id': network_id,
+ 'admin_state_up': True,
+ 'device_id': router_id,
+ 'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
+ 'name': constants.HA_PORT_NAME % tenant_id}
+ port = p_utils.create_port(self._core_plugin, context,
+ {'port': args})
try:
return self._create_ha_port_binding(context, port['id'], router_id)
Common utilities and helper functions for Openstack Networking Plugins.
"""
+import webob.exc
+
+from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
from neutron.plugins.common import constants as p_const
return status in (p_const.PENDING_CREATE,
p_const.PENDING_UPDATE,
p_const.PENDING_DELETE)
+
+
+def _fixup_res_dict(context, attr_name, res_dict, check_allow_post=True):
+ attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[attr_name]
+ try:
+ attributes.populate_tenant_id(context, res_dict, attr_info, True)
+ attributes.verify_attributes(res_dict, attr_info)
+ except webob.exc.HTTPBadRequest as e:
+ # convert webob exception into ValueError as these functions are
+ # for internal use. webob exception doesn't make sense.
+ raise ValueError(e.detail)
+ attributes.fill_default_value(attr_info, res_dict,
+ check_allow_post=check_allow_post)
+ attributes.convert_value(attr_info, res_dict)
+ return res_dict
+
+
+def create_network(core_plugin, context, net):
+ net_data = _fixup_res_dict(context, attributes.NETWORKS,
+ net.get('network', {}))
+ return core_plugin.create_network(context, {'network': net_data})
+
+
+def create_subnet(core_plugin, context, subnet):
+ subnet_data = _fixup_res_dict(context, attributes.SUBNETS,
+ subnet.get('subnet', {}))
+ return core_plugin.create_subnet(context, {'subnet': subnet_data})
+
+
+def create_port(core_plugin, context, port, check_allow_post=True):
+ port_data = _fixup_res_dict(context, attributes.PORTS,
+ port.get('port', {}),
+ check_allow_post=check_allow_post)
+ return core_plugin.create_port(context, {'port': port_data})
set_dirty_p = mock.patch('neutron.quota.resource_registry.'
'set_resources_dirty')
self.mock_set_dirty = set_dirty_p.start()
+ self.utils_p = mock.patch('neutron.plugins.common.utils.create_port')
+ self.utils = self.utils_p.start()
def test_get_active_networks(self):
plugin_retval = [dict(id='a'), dict(id='b')]
'fixed_ips': [{'subnet_id': 'foo_subnet_id'}]
}
self.plugin.create_port.side_effect = exc
+ self.utils.side_effect = exc
self.assertIsNone(self.callbacks._port_action(self.plugin,
mock.Mock(),
{'port': port},
def _test__port_action_good_action(self, action, port, expected_call):
self.callbacks._port_action(self.plugin, mock.Mock(),
port, action)
- self.plugin.assert_has_calls([expected_call])
+ if action == 'create_port':
+ self.utils.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
+ else:
+ self.plugin.assert_has_calls([expected_call])
def test_port_action_create_port(self):
self._test__port_action_good_action(
def test_convert_to_list_non_iterable(self):
for item in (True, False, 1, 1.2, object()):
self.assertEqual([item], attributes.convert_to_list(item))
+
+
+class TestResDict(base.BaseTestCase):
+ class _MyException(Exception):
+ pass
+ _EXC_CLS = _MyException
+
+ def _test_fill_default_value(self, attr_info, expected, res_dict):
+ attributes.fill_default_value(attr_info, res_dict)
+ self.assertEqual(expected, res_dict)
+
+ def test_fill_default_value(self):
+ attr_info = {
+ 'key': {
+ 'allow_post': True,
+ 'default': attributes.ATTR_NOT_SPECIFIED,
+ },
+ }
+ self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
+ self._test_fill_default_value(
+ attr_info, {'key': attributes.ATTR_NOT_SPECIFIED}, {})
+
+ attr_info = {
+ 'key': {
+ 'allow_post': True,
+ },
+ }
+ self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
+ self.assertRaises(ValueError, self._test_fill_default_value,
+ attr_info, {'key': 'X'}, {})
+ self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
+ attr_info, {}, self._EXC_CLS)
+ attr_info = {
+ 'key': {
+ 'allow_post': False,
+ },
+ }
+ self.assertRaises(ValueError, self._test_fill_default_value,
+ attr_info, {'key': 'X'}, {'key': 'X'})
+ self._test_fill_default_value(attr_info, {}, {})
+ self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
+ attr_info, {'key': 'X'}, self._EXC_CLS)
+
+ def _test_convert_value(self, attr_info, expected, res_dict):
+ attributes.convert_value(attr_info, res_dict)
+ self.assertEqual(expected, res_dict)
+
+ def test_convert_value(self):
+ attr_info = {
+ 'key': {
+ },
+ }
+ self._test_convert_value(attr_info,
+ {'key': attributes.ATTR_NOT_SPECIFIED},
+ {'key': attributes.ATTR_NOT_SPECIFIED})
+ self._test_convert_value(attr_info, {'key': 'X'}, {'key': 'X'})
+ self._test_convert_value(attr_info,
+ {'other_key': 'X'}, {'other_key': 'X'})
+
+ attr_info = {
+ 'key': {
+ 'convert_to': attributes.convert_to_int,
+ },
+ }
+ self._test_convert_value(attr_info,
+ {'key': attributes.ATTR_NOT_SPECIFIED},
+ {'key': attributes.ATTR_NOT_SPECIFIED})
+ self._test_convert_value(attr_info, {'key': 1}, {'key': '1'})
+ self._test_convert_value(attr_info, {'key': 1}, {'key': 1})
+ self.assertRaises(n_exc.InvalidInput, self._test_convert_value,
+ attr_info, {'key': 1}, {'key': 'a'})
+
+ attr_info = {
+ 'key': {
+ 'validate': {'type:uuid': None},
+ },
+ }
+ self._test_convert_value(attr_info,
+ {'key': attributes.ATTR_NOT_SPECIFIED},
+ {'key': attributes.ATTR_NOT_SPECIFIED})
+ uuid_str = '01234567-1234-1234-1234-1234567890ab'
+ self._test_convert_value(attr_info,
+ {'key': uuid_str}, {'key': uuid_str})
+ self.assertRaises(ValueError, self._test_convert_value,
+ attr_info, {'key': 1}, {'key': 1})
+ self.assertRaises(self._EXC_CLS, attributes.convert_value,
+ attr_info, {'key': 1}, self._EXC_CLS)