From: Isaku Yamahata Date: Tue, 21 Oct 2014 02:30:32 +0000 (+0900) Subject: Replace internal calls of create_{network, subnet, port} X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=f1457af336b8b4ed72105b9d98a53f95c28c0c1e;p=openstack-build%2Fneutron-build.git Replace internal calls of create_{network, subnet, port} When API controller calls method create_{network, subnet, port), it made sure that the necessary default values for attrs are filled properly according to attr mapping. However, internal calls to these methods do not follow the convention, when extension codes miss these values, exceptions will be thrown. This patch introduces helper functions to fix up arguments and replaces the direct callers of those methods. Co-Authored-By: gong yong sheng Co-Authored-By: yalei wang Change-Id: Ibc6ff897a1a00665a403981a218100a698eb1c33 Closes-Bug: #1383546 --- diff --git a/neutron/api/rpc/handlers/dhcp_rpc.py b/neutron/api/rpc/handlers/dhcp_rpc.py index bba9f2341..9eb23f8eb 100644 --- a/neutron/api/rpc/handlers/dhcp_rpc.py +++ b/neutron/api/rpc/handlers/dhcp_rpc.py @@ -30,8 +30,10 @@ from neutron.db import api as db_api from neutron.extensions import portbindings from neutron.i18n import _LW from neutron import manager +from neutron.plugins.common import utils as p_utils from neutron.quota import resource_registry + LOG = logging.getLogger(__name__) @@ -77,7 +79,7 @@ class DhcpRpcCallback(object): """Perform port operations taking care of concurrency issues.""" try: if action == 'create_port': - return plugin.create_port(context, port) + return p_utils.create_port(plugin, context, port) elif action == 'update_port': return plugin.update_port(context, port['id'], port) else: diff --git a/neutron/api/v2/attributes.py b/neutron/api/v2/attributes.py index ff0165be4..cfc141e62 100644 --- a/neutron/api/v2/attributes.py +++ b/neutron/api/v2/attributes.py @@ -19,6 +19,7 @@ import netaddr from oslo_log import log as logging from oslo_utils import uuidutils import six +import webob.exc from neutron.common import constants from neutron.common import exceptions as n_exc @@ -884,3 +885,65 @@ PLURALS = {NETWORKS: NETWORK, 'allocation_pools': 'allocation_pool', 'fixed_ips': 'fixed_ip', 'extensions': 'extension'} + + +def fill_default_value(attr_info, res_dict, + exc_cls=ValueError, + check_allow_post=True): + for attr, attr_vals in six.iteritems(attr_info): + if attr_vals['allow_post']: + if ('default' not in attr_vals and + attr not in res_dict): + msg = _("Failed to parse request. Required " + "attribute '%s' not specified") % attr + raise exc_cls(msg) + res_dict[attr] = res_dict.get(attr, + attr_vals.get('default')) + elif check_allow_post: + if attr in res_dict: + msg = _("Attribute '%s' not allowed in POST") % attr + raise exc_cls(msg) + + +def convert_value(attr_info, res_dict, exc_cls=ValueError): + for attr, attr_vals in six.iteritems(attr_info): + if (attr not in res_dict or + res_dict[attr] is ATTR_NOT_SPECIFIED): + continue + # Convert values if necessary + if 'convert_to' in attr_vals: + res_dict[attr] = attr_vals['convert_to'](res_dict[attr]) + # Check that configured values are correct + if 'validate' not in attr_vals: + continue + for rule in attr_vals['validate']: + res = validators[rule](res_dict[attr], attr_vals['validate'][rule]) + if res: + msg_dict = dict(attr=attr, reason=res) + msg = _("Invalid input for %(attr)s. " + "Reason: %(reason)s.") % msg_dict + raise exc_cls(msg) + + +def populate_tenant_id(context, res_dict, attr_info, is_create): + if (('tenant_id' in res_dict and + res_dict['tenant_id'] != context.tenant_id and + not context.is_admin)): + msg = _("Specifying 'tenant_id' other than authenticated " + "tenant in request requires admin privileges") + raise webob.exc.HTTPBadRequest(msg) + + if is_create and 'tenant_id' not in res_dict: + if context.tenant_id: + res_dict['tenant_id'] = context.tenant_id + elif 'tenant_id' in attr_info: + msg = _("Running without keystone AuthN requires " + "that tenant_id is specified") + raise webob.exc.HTTPBadRequest(msg) + + +def verify_attributes(res_dict, attr_info): + extra_keys = set(res_dict.keys()) - set(attr_info.keys()) + if extra_keys: + msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys) + raise webob.exc.HTTPBadRequest(msg) diff --git a/neutron/api/v2/base.py b/neutron/api/v2/base.py index cd591b4f9..1a78aa4d3 100644 --- a/neutron/api/v2/base.py +++ b/neutron/api/v2/base.py @@ -596,23 +596,6 @@ class Controller(object): self._send_nova_notification(action, orig_object_copy, result) return result - @staticmethod - def _populate_tenant_id(context, res_dict, attr_info, is_create): - if (('tenant_id' in res_dict and - res_dict['tenant_id'] != context.tenant_id and - not context.is_admin)): - msg = _("Specifying 'tenant_id' other than authenticated " - "tenant in request requires admin privileges") - raise webob.exc.HTTPBadRequest(msg) - - if is_create and 'tenant_id' not in res_dict: - if context.tenant_id: - res_dict['tenant_id'] = context.tenant_id - elif 'tenant_id' in attr_info: - msg = _("Running without keystone AuthN requires " - "that tenant_id is specified") - raise webob.exc.HTTPBadRequest(msg) - @staticmethod def prepare_request_body(context, body, is_create, resource, attr_info, allow_bulk=False): @@ -652,56 +635,21 @@ class Controller(object): msg = _("Unable to find '%s' in request body") % resource raise webob.exc.HTTPBadRequest(msg) - Controller._populate_tenant_id(context, res_dict, attr_info, is_create) - Controller._verify_attributes(res_dict, attr_info) + attributes.populate_tenant_id(context, res_dict, attr_info, is_create) + attributes.verify_attributes(res_dict, attr_info) if is_create: # POST - for attr, attr_vals in six.iteritems(attr_info): - if attr_vals['allow_post']: - if ('default' not in attr_vals and - attr not in res_dict): - msg = _("Failed to parse request. Required " - "attribute '%s' not specified") % attr - raise webob.exc.HTTPBadRequest(msg) - res_dict[attr] = res_dict.get(attr, - attr_vals.get('default')) - else: - if attr in res_dict: - msg = _("Attribute '%s' not allowed in POST") % attr - raise webob.exc.HTTPBadRequest(msg) + attributes.fill_default_value(attr_info, res_dict, + webob.exc.HTTPBadRequest) else: # PUT for attr, attr_vals in six.iteritems(attr_info): if attr in res_dict and not attr_vals['allow_put']: msg = _("Cannot update read-only attribute %s") % attr raise webob.exc.HTTPBadRequest(msg) - for attr, attr_vals in six.iteritems(attr_info): - if (attr not in res_dict or - res_dict[attr] is attributes.ATTR_NOT_SPECIFIED): - continue - # Convert values if necessary - if 'convert_to' in attr_vals: - res_dict[attr] = attr_vals['convert_to'](res_dict[attr]) - # Check that configured values are correct - if 'validate' not in attr_vals: - continue - for rule in attr_vals['validate']: - res = attributes.validators[rule](res_dict[attr], - attr_vals['validate'][rule]) - if res: - msg_dict = dict(attr=attr, reason=res) - msg = _("Invalid input for %(attr)s. " - "Reason: %(reason)s.") % msg_dict - raise webob.exc.HTTPBadRequest(msg) + attributes.convert_value(attr_info, res_dict, webob.exc.HTTPBadRequest) return body - @staticmethod - def _verify_attributes(res_dict, attr_info): - extra_keys = set(res_dict.keys()) - set(attr_info.keys()) - if extra_keys: - msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys) - raise webob.exc.HTTPBadRequest(msg) - def _validate_network_tenant_ownership(self, request, resource_item): # TODO(salvatore-orlando): consider whether this check can be folded # in the policy engine diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index 14b1dc50d..852fd9b83 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -40,6 +40,7 @@ from neutron.extensions import l3 from neutron.i18n import _LI, _LE from neutron import manager from neutron.plugins.common import constants +from neutron.plugins.common import utils as p_utils LOG = logging.getLogger(__name__) @@ -278,15 +279,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): def _create_router_gw_port(self, context, router, network_id, ext_ips): # Port has no 'tenant-id', as it is hidden from user - gw_port = self._core_plugin.create_port(context.elevated(), { - 'port': {'tenant_id': '', # intentionally not set + port_data = {'tenant_id': '', # intentionally not set 'network_id': network_id, - 'mac_address': attributes.ATTR_NOT_SPECIFIED, 'fixed_ips': ext_ips or attributes.ATTR_NOT_SPECIFIED, 'device_id': router['id'], 'device_owner': DEVICE_OWNER_ROUTER_GW, 'admin_state_up': True, - 'name': ''}}) + 'name': ''} + gw_port = p_utils.create_port(self._core_plugin, + context.elevated(), {'port': port_data}) if not gw_port['fixed_ips']: LOG.debug('No IPs available for external network %s', @@ -596,16 +597,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): port['port_id'], {'port': {'fixed_ips': fixed_ips}}), [subnet], False - return self._core_plugin.create_port(context, { - 'port': - {'tenant_id': subnet['tenant_id'], - 'network_id': subnet['network_id'], - 'fixed_ips': [fixed_ip], - 'mac_address': attributes.ATTR_NOT_SPECIFIED, - 'admin_state_up': True, - 'device_id': router.id, - 'device_owner': owner, - 'name': ''}}), [subnet], True + port_data = {'tenant_id': subnet['tenant_id'], + 'network_id': subnet['network_id'], + 'fixed_ips': [fixed_ip], + 'admin_state_up': True, + 'device_id': router.id, + 'device_owner': owner, + 'name': ''} + return p_utils.create_port(self._core_plugin, context, + {'port': port_data}), [subnet], True @staticmethod def _make_router_interface_info( @@ -956,14 +956,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): port = {'tenant_id': '', # tenant intentionally not set 'network_id': f_net_id, - 'mac_address': attributes.ATTR_NOT_SPECIFIED, - 'fixed_ips': attributes.ATTR_NOT_SPECIFIED, 'admin_state_up': True, 'device_id': fip_id, 'device_owner': DEVICE_OWNER_FLOATINGIP, 'status': l3_constants.PORT_STATUS_NOTAPPLICABLE, 'name': ''} - if fip.get('floating_ip_address'): port['fixed_ips'] = [ {'ip_address': fip['floating_ip_address']}] @@ -971,9 +968,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): if fip.get('subnet_id'): port['fixed_ips'] = [ {'subnet_id': fip['subnet_id']}] - external_port = self._core_plugin.create_port(context.elevated(), - {'port': port}) + # 'status' in port dict could not be updated by default, use + # check_allow_post to stop the verification of system + external_port = p_utils.create_port(self._core_plugin, + context.elevated(), + {'port': port}, + check_allow_post=False) # Ensure IPv4 addresses are allocated on external port external_ipv4_ips = self._port_ipv4_fixed_ips(external_port) if not external_ipv4_ips: diff --git a/neutron/db/l3_dvr_db.py b/neutron/db/l3_dvr_db.py index 9438ab043..16f48c86f 100644 --- a/neutron/db/l3_dvr_db.py +++ b/neutron/db/l3_dvr_db.py @@ -35,6 +35,7 @@ from neutron.extensions import portbindings from neutron.i18n import _LI from neutron import manager from neutron.plugins.common import constants +from neutron.plugins.common import utils as p_utils LOG = logging.getLogger(__name__) @@ -563,17 +564,15 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin, if not f_port: LOG.info(_LI('Agent Gateway port does not exist,' ' so create one: %s'), f_port) - agent_port = self._core_plugin.create_port( - context, - {'port': {'tenant_id': '', - 'network_id': network_id, - 'mac_address': attributes.ATTR_NOT_SPECIFIED, - 'fixed_ips': attributes.ATTR_NOT_SPECIFIED, - 'device_id': l3_agent_db['id'], - 'device_owner': DEVICE_OWNER_AGENT_GW, - 'binding:host_id': host, - 'admin_state_up': True, - 'name': ''}}) + port_data = {'tenant_id': '', + 'network_id': network_id, + 'device_id': l3_agent_db['id'], + 'device_owner': DEVICE_OWNER_AGENT_GW, + 'binding:host_id': host, + 'admin_state_up': True, + 'name': ''} + agent_port = p_utils.create_port(self._core_plugin, context, + {'port': port_data}) if agent_port: self._populate_subnets_for_ports(context, [agent_port]) return agent_port @@ -598,16 +597,15 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin, def _add_csnat_router_interface_port( self, context, router, network_id, subnet_id, do_pop=True): """Add SNAT interface to the specified router and subnet.""" - snat_port = self._core_plugin.create_port( - context, - {'port': {'tenant_id': '', - 'network_id': network_id, - 'mac_address': attributes.ATTR_NOT_SPECIFIED, - 'fixed_ips': [{'subnet_id': subnet_id}], - 'device_id': router.id, - 'device_owner': DEVICE_OWNER_DVR_SNAT, - 'admin_state_up': True, - 'name': ''}}) + port_data = {'tenant_id': '', + 'network_id': network_id, + 'fixed_ips': [{'subnet_id': subnet_id}], + 'device_id': router.id, + 'device_owner': DEVICE_OWNER_DVR_SNAT, + 'admin_state_up': True, + 'name': ''} + snat_port = p_utils.create_port(self._core_plugin, context, + {'port': port_data}) if not snat_port: msg = _("Unable to create the SNAT Interface Port") raise n_exc.BadRequest(resource='router', msg=msg) diff --git a/neutron/db/l3_hamode_db.py b/neutron/db/l3_hamode_db.py index 0d9b0bb39..7b286869e 100644 --- a/neutron/db/l3_hamode_db.py +++ b/neutron/db/l3_hamode_db.py @@ -32,6 +32,8 @@ from neutron.extensions import l3_ext_ha_mode as l3_ha from neutron.extensions import portbindings from neutron.extensions import providernet from neutron.i18n import _LI +from neutron.plugins.common import utils as p_utils + VR_ID_RANGE = set(range(1, 255)) MAX_ALLOCATION_TRIES = 10 @@ -219,18 +221,15 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin): context, ha_network.network_id, router.id) def _create_ha_subnet(self, context, network_id, tenant_id): - args = {'subnet': - {'network_id': network_id, - 'tenant_id': '', - 'name': constants.HA_SUBNET_NAME % tenant_id, - 'ip_version': 4, - 'cidr': cfg.CONF.l3_ha_net_cidr, - 'enable_dhcp': False, - 'host_routes': attributes.ATTR_NOT_SPECIFIED, - 'dns_nameservers': attributes.ATTR_NOT_SPECIFIED, - 'allocation_pools': attributes.ATTR_NOT_SPECIFIED, - 'gateway_ip': None}} - return self._core_plugin.create_subnet(context, args) + args = {'network_id': network_id, + 'tenant_id': '', + 'name': constants.HA_SUBNET_NAME % tenant_id, + 'ip_version': 4, + 'cidr': cfg.CONF.l3_ha_net_cidr, + 'enable_dhcp': False, + 'gateway_ip': None} + return p_utils.create_subnet(self._core_plugin, context, + {'subnet': args}) def _create_ha_network_tenant_binding(self, context, tenant_id, network_id): @@ -255,11 +254,10 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin): {'name': constants.HA_NETWORK_NAME % tenant_id, 'tenant_id': '', 'shared': False, - 'admin_state_up': True, - 'status': constants.NET_STATUS_ACTIVE}} + 'admin_state_up': True}} self._add_ha_network_settings(args['network']) + network = p_utils.create_network(self._core_plugin, admin_ctx, args) - network = self._core_plugin.create_network(admin_ctx, args) try: ha_network = self._create_ha_network_tenant_binding(admin_ctx, tenant_id, @@ -312,16 +310,14 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin): return portbinding def add_ha_port(self, context, router_id, network_id, tenant_id): - port = self._core_plugin.create_port(context, { - 'port': - {'tenant_id': '', - 'network_id': network_id, - 'fixed_ips': attributes.ATTR_NOT_SPECIFIED, - 'mac_address': attributes.ATTR_NOT_SPECIFIED, - 'admin_state_up': True, - 'device_id': router_id, - 'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF, - 'name': constants.HA_PORT_NAME % tenant_id}}) + args = {'tenant_id': '', + 'network_id': network_id, + 'admin_state_up': True, + 'device_id': router_id, + 'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF, + 'name': constants.HA_PORT_NAME % tenant_id} + port = p_utils.create_port(self._core_plugin, context, + {'port': args}) try: return self._create_ha_port_binding(context, port['id'], router_id) diff --git a/neutron/plugins/common/utils.py b/neutron/plugins/common/utils.py index 40ca2cffd..287ea1a30 100644 --- a/neutron/plugins/common/utils.py +++ b/neutron/plugins/common/utils.py @@ -16,6 +16,9 @@ Common utilities and helper functions for Openstack Networking Plugins. """ +import webob.exc + +from neutron.api.v2 import attributes from neutron.common import exceptions as n_exc from neutron.plugins.common import constants as p_const @@ -96,3 +99,37 @@ def in_pending_status(status): return status in (p_const.PENDING_CREATE, p_const.PENDING_UPDATE, p_const.PENDING_DELETE) + + +def _fixup_res_dict(context, attr_name, res_dict, check_allow_post=True): + attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[attr_name] + try: + attributes.populate_tenant_id(context, res_dict, attr_info, True) + attributes.verify_attributes(res_dict, attr_info) + except webob.exc.HTTPBadRequest as e: + # convert webob exception into ValueError as these functions are + # for internal use. webob exception doesn't make sense. + raise ValueError(e.detail) + attributes.fill_default_value(attr_info, res_dict, + check_allow_post=check_allow_post) + attributes.convert_value(attr_info, res_dict) + return res_dict + + +def create_network(core_plugin, context, net): + net_data = _fixup_res_dict(context, attributes.NETWORKS, + net.get('network', {})) + return core_plugin.create_network(context, {'network': net_data}) + + +def create_subnet(core_plugin, context, subnet): + subnet_data = _fixup_res_dict(context, attributes.SUBNETS, + subnet.get('subnet', {})) + return core_plugin.create_subnet(context, {'subnet': subnet_data}) + + +def create_port(core_plugin, context, port, check_allow_post=True): + port_data = _fixup_res_dict(context, attributes.PORTS, + port.get('port', {}), + check_allow_post=check_allow_post) + return core_plugin.create_port(context, {'port': port_data}) diff --git a/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py index a06fd2a0d..d57632139 100644 --- a/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py @@ -36,6 +36,8 @@ class TestDhcpRpcCallback(base.BaseTestCase): set_dirty_p = mock.patch('neutron.quota.resource_registry.' 'set_resources_dirty') self.mock_set_dirty = set_dirty_p.start() + self.utils_p = mock.patch('neutron.plugins.common.utils.create_port') + self.utils = self.utils_p.start() def test_get_active_networks(self): plugin_retval = [dict(id='a'), dict(id='b')] @@ -79,6 +81,7 @@ class TestDhcpRpcCallback(base.BaseTestCase): 'fixed_ips': [{'subnet_id': 'foo_subnet_id'}] } self.plugin.create_port.side_effect = exc + self.utils.side_effect = exc self.assertIsNone(self.callbacks._port_action(self.plugin, mock.Mock(), {'port': port}, @@ -87,7 +90,10 @@ class TestDhcpRpcCallback(base.BaseTestCase): def _test__port_action_good_action(self, action, port, expected_call): self.callbacks._port_action(self.plugin, mock.Mock(), port, action) - self.plugin.assert_has_calls([expected_call]) + if action == 'create_port': + self.utils.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY) + else: + self.plugin.assert_has_calls([expected_call]) def test_port_action_create_port(self): self._test__port_action_good_action( diff --git a/neutron/tests/unit/api/v2/test_attributes.py b/neutron/tests/unit/api/v2/test_attributes.py index 512fc3022..7b03a91d3 100644 --- a/neutron/tests/unit/api/v2/test_attributes.py +++ b/neutron/tests/unit/api/v2/test_attributes.py @@ -878,3 +878,90 @@ class TestConvertToList(base.BaseTestCase): def test_convert_to_list_non_iterable(self): for item in (True, False, 1, 1.2, object()): self.assertEqual([item], attributes.convert_to_list(item)) + + +class TestResDict(base.BaseTestCase): + class _MyException(Exception): + pass + _EXC_CLS = _MyException + + def _test_fill_default_value(self, attr_info, expected, res_dict): + attributes.fill_default_value(attr_info, res_dict) + self.assertEqual(expected, res_dict) + + def test_fill_default_value(self): + attr_info = { + 'key': { + 'allow_post': True, + 'default': attributes.ATTR_NOT_SPECIFIED, + }, + } + self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'}) + self._test_fill_default_value( + attr_info, {'key': attributes.ATTR_NOT_SPECIFIED}, {}) + + attr_info = { + 'key': { + 'allow_post': True, + }, + } + self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'}) + self.assertRaises(ValueError, self._test_fill_default_value, + attr_info, {'key': 'X'}, {}) + self.assertRaises(self._EXC_CLS, attributes.fill_default_value, + attr_info, {}, self._EXC_CLS) + attr_info = { + 'key': { + 'allow_post': False, + }, + } + self.assertRaises(ValueError, self._test_fill_default_value, + attr_info, {'key': 'X'}, {'key': 'X'}) + self._test_fill_default_value(attr_info, {}, {}) + self.assertRaises(self._EXC_CLS, attributes.fill_default_value, + attr_info, {'key': 'X'}, self._EXC_CLS) + + def _test_convert_value(self, attr_info, expected, res_dict): + attributes.convert_value(attr_info, res_dict) + self.assertEqual(expected, res_dict) + + def test_convert_value(self): + attr_info = { + 'key': { + }, + } + self._test_convert_value(attr_info, + {'key': attributes.ATTR_NOT_SPECIFIED}, + {'key': attributes.ATTR_NOT_SPECIFIED}) + self._test_convert_value(attr_info, {'key': 'X'}, {'key': 'X'}) + self._test_convert_value(attr_info, + {'other_key': 'X'}, {'other_key': 'X'}) + + attr_info = { + 'key': { + 'convert_to': attributes.convert_to_int, + }, + } + self._test_convert_value(attr_info, + {'key': attributes.ATTR_NOT_SPECIFIED}, + {'key': attributes.ATTR_NOT_SPECIFIED}) + self._test_convert_value(attr_info, {'key': 1}, {'key': '1'}) + self._test_convert_value(attr_info, {'key': 1}, {'key': 1}) + self.assertRaises(n_exc.InvalidInput, self._test_convert_value, + attr_info, {'key': 1}, {'key': 'a'}) + + attr_info = { + 'key': { + 'validate': {'type:uuid': None}, + }, + } + self._test_convert_value(attr_info, + {'key': attributes.ATTR_NOT_SPECIFIED}, + {'key': attributes.ATTR_NOT_SPECIFIED}) + uuid_str = '01234567-1234-1234-1234-1234567890ab' + self._test_convert_value(attr_info, + {'key': uuid_str}, {'key': uuid_str}) + self.assertRaises(ValueError, self._test_convert_value, + attr_info, {'key': 1}, {'key': 1}) + self.assertRaises(self._EXC_CLS, attributes.convert_value, + attr_info, {'key': 1}, self._EXC_CLS)