]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Replace internal calls of create_{network, subnet, port}
authorIsaku Yamahata <isaku.yamahata@intel.com>
Tue, 21 Oct 2014 02:30:32 +0000 (11:30 +0900)
committerYalei Wang <yalei.wang@intel.com>
Fri, 14 Aug 2015 11:34:54 +0000 (19:34 +0800)
When API controller calls method create_{network, subnet, port),
it made sure that the necessary default values for attrs are filled properly
according to attr mapping.

However, internal calls to these methods do not follow the convention,
when extension codes miss these values, exceptions will be thrown.

This patch introduces helper functions to fix up arguments and replaces
the direct callers of those methods.

Co-Authored-By: gong yong sheng <gong.yongsheng@99cloud.net>
Co-Authored-By: yalei wang <yalei.wang@intel.com>
Change-Id: Ibc6ff897a1a00665a403981a218100a698eb1c33
Closes-Bug: #1383546

neutron/api/rpc/handlers/dhcp_rpc.py
neutron/api/v2/attributes.py
neutron/api/v2/base.py
neutron/db/l3_db.py
neutron/db/l3_dvr_db.py
neutron/db/l3_hamode_db.py
neutron/plugins/common/utils.py
neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py
neutron/tests/unit/api/v2/test_attributes.py

index bba9f2341faa2c79d20c61fb0f65bddf2dcf64b3..9eb23f8eb790f6d05bb11eaa756ec76f026ad91b 100644 (file)
@@ -30,8 +30,10 @@ from neutron.db import api as db_api
 from neutron.extensions import portbindings
 from neutron.i18n import _LW
 from neutron import manager
+from neutron.plugins.common import utils as p_utils
 from neutron.quota import resource_registry
 
+
 LOG = logging.getLogger(__name__)
 
 
@@ -77,7 +79,7 @@ class DhcpRpcCallback(object):
         """Perform port operations taking care of concurrency issues."""
         try:
             if action == 'create_port':
-                return plugin.create_port(context, port)
+                return p_utils.create_port(plugin, context, port)
             elif action == 'update_port':
                 return plugin.update_port(context, port['id'], port)
             else:
index ff0165be431c9ead91b5a5d89d9d2c1dcb7cc1d9..cfc141e62ec4045f47232c32ff4bce663a6cdde8 100644 (file)
@@ -19,6 +19,7 @@ import netaddr
 from oslo_log import log as logging
 from oslo_utils import uuidutils
 import six
+import webob.exc
 
 from neutron.common import constants
 from neutron.common import exceptions as n_exc
@@ -884,3 +885,65 @@ PLURALS = {NETWORKS: NETWORK,
            'allocation_pools': 'allocation_pool',
            'fixed_ips': 'fixed_ip',
            'extensions': 'extension'}
+
+
+def fill_default_value(attr_info, res_dict,
+                       exc_cls=ValueError,
+                       check_allow_post=True):
+    for attr, attr_vals in six.iteritems(attr_info):
+        if attr_vals['allow_post']:
+            if ('default' not in attr_vals and
+                attr not in res_dict):
+                msg = _("Failed to parse request. Required "
+                        "attribute '%s' not specified") % attr
+                raise exc_cls(msg)
+            res_dict[attr] = res_dict.get(attr,
+                                          attr_vals.get('default'))
+        elif check_allow_post:
+            if attr in res_dict:
+                msg = _("Attribute '%s' not allowed in POST") % attr
+                raise exc_cls(msg)
+
+
+def convert_value(attr_info, res_dict, exc_cls=ValueError):
+    for attr, attr_vals in six.iteritems(attr_info):
+        if (attr not in res_dict or
+            res_dict[attr] is ATTR_NOT_SPECIFIED):
+            continue
+        # Convert values if necessary
+        if 'convert_to' in attr_vals:
+            res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
+        # Check that configured values are correct
+        if 'validate' not in attr_vals:
+            continue
+        for rule in attr_vals['validate']:
+            res = validators[rule](res_dict[attr], attr_vals['validate'][rule])
+            if res:
+                msg_dict = dict(attr=attr, reason=res)
+                msg = _("Invalid input for %(attr)s. "
+                        "Reason: %(reason)s.") % msg_dict
+                raise exc_cls(msg)
+
+
+def populate_tenant_id(context, res_dict, attr_info, is_create):
+    if (('tenant_id' in res_dict and
+         res_dict['tenant_id'] != context.tenant_id and
+         not context.is_admin)):
+        msg = _("Specifying 'tenant_id' other than authenticated "
+                "tenant in request requires admin privileges")
+        raise webob.exc.HTTPBadRequest(msg)
+
+    if is_create and 'tenant_id' not in res_dict:
+        if context.tenant_id:
+            res_dict['tenant_id'] = context.tenant_id
+        elif 'tenant_id' in attr_info:
+            msg = _("Running without keystone AuthN requires "
+                    "that tenant_id is specified")
+            raise webob.exc.HTTPBadRequest(msg)
+
+
+def verify_attributes(res_dict, attr_info):
+    extra_keys = set(res_dict.keys()) - set(attr_info.keys())
+    if extra_keys:
+        msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
+        raise webob.exc.HTTPBadRequest(msg)
index cd591b4f9eaf45246e0d573811fecef9659c0ca8..1a78aa4d338c0612567f847903284f37a77b54ad 100644 (file)
@@ -596,23 +596,6 @@ class Controller(object):
         self._send_nova_notification(action, orig_object_copy, result)
         return result
 
-    @staticmethod
-    def _populate_tenant_id(context, res_dict, attr_info, is_create):
-        if (('tenant_id' in res_dict and
-             res_dict['tenant_id'] != context.tenant_id and
-             not context.is_admin)):
-            msg = _("Specifying 'tenant_id' other than authenticated "
-                    "tenant in request requires admin privileges")
-            raise webob.exc.HTTPBadRequest(msg)
-
-        if is_create and 'tenant_id' not in res_dict:
-            if context.tenant_id:
-                res_dict['tenant_id'] = context.tenant_id
-            elif 'tenant_id' in attr_info:
-                msg = _("Running without keystone AuthN requires "
-                        "that tenant_id is specified")
-                raise webob.exc.HTTPBadRequest(msg)
-
     @staticmethod
     def prepare_request_body(context, body, is_create, resource, attr_info,
                              allow_bulk=False):
@@ -652,56 +635,21 @@ class Controller(object):
             msg = _("Unable to find '%s' in request body") % resource
             raise webob.exc.HTTPBadRequest(msg)
 
-        Controller._populate_tenant_id(context, res_dict, attr_info, is_create)
-        Controller._verify_attributes(res_dict, attr_info)
+        attributes.populate_tenant_id(context, res_dict, attr_info, is_create)
+        attributes.verify_attributes(res_dict, attr_info)
 
         if is_create:  # POST
-            for attr, attr_vals in six.iteritems(attr_info):
-                if attr_vals['allow_post']:
-                    if ('default' not in attr_vals and
-                        attr not in res_dict):
-                        msg = _("Failed to parse request. Required "
-                                "attribute '%s' not specified") % attr
-                        raise webob.exc.HTTPBadRequest(msg)
-                    res_dict[attr] = res_dict.get(attr,
-                                                  attr_vals.get('default'))
-                else:
-                    if attr in res_dict:
-                        msg = _("Attribute '%s' not allowed in POST") % attr
-                        raise webob.exc.HTTPBadRequest(msg)
+            attributes.fill_default_value(attr_info, res_dict,
+                                          webob.exc.HTTPBadRequest)
         else:  # PUT
             for attr, attr_vals in six.iteritems(attr_info):
                 if attr in res_dict and not attr_vals['allow_put']:
                     msg = _("Cannot update read-only attribute %s") % attr
                     raise webob.exc.HTTPBadRequest(msg)
 
-        for attr, attr_vals in six.iteritems(attr_info):
-            if (attr not in res_dict or
-                res_dict[attr] is attributes.ATTR_NOT_SPECIFIED):
-                continue
-            # Convert values if necessary
-            if 'convert_to' in attr_vals:
-                res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
-            # Check that configured values are correct
-            if 'validate' not in attr_vals:
-                continue
-            for rule in attr_vals['validate']:
-                res = attributes.validators[rule](res_dict[attr],
-                                                  attr_vals['validate'][rule])
-                if res:
-                    msg_dict = dict(attr=attr, reason=res)
-                    msg = _("Invalid input for %(attr)s. "
-                            "Reason: %(reason)s.") % msg_dict
-                    raise webob.exc.HTTPBadRequest(msg)
+        attributes.convert_value(attr_info, res_dict, webob.exc.HTTPBadRequest)
         return body
 
-    @staticmethod
-    def _verify_attributes(res_dict, attr_info):
-        extra_keys = set(res_dict.keys()) - set(attr_info.keys())
-        if extra_keys:
-            msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
-            raise webob.exc.HTTPBadRequest(msg)
-
     def _validate_network_tenant_ownership(self, request, resource_item):
         # TODO(salvatore-orlando): consider whether this check can be folded
         # in the policy engine
index 14b1dc50dea90a90228e56f9229be5e368bfe4aa..852fd9b8333c4d8a7711b927e496c2f28eb48e23 100644 (file)
@@ -40,6 +40,7 @@ from neutron.extensions import l3
 from neutron.i18n import _LI, _LE
 from neutron import manager
 from neutron.plugins.common import constants
+from neutron.plugins.common import utils as p_utils
 
 LOG = logging.getLogger(__name__)
 
@@ -278,15 +279,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
 
     def _create_router_gw_port(self, context, router, network_id, ext_ips):
         # Port has no 'tenant-id', as it is hidden from user
-        gw_port = self._core_plugin.create_port(context.elevated(), {
-            'port': {'tenant_id': '',  # intentionally not set
+        port_data = {'tenant_id': '',  # intentionally not set
                      'network_id': network_id,
-                     'mac_address': attributes.ATTR_NOT_SPECIFIED,
                      'fixed_ips': ext_ips or attributes.ATTR_NOT_SPECIFIED,
                      'device_id': router['id'],
                      'device_owner': DEVICE_OWNER_ROUTER_GW,
                      'admin_state_up': True,
-                     'name': ''}})
+                     'name': ''}
+        gw_port = p_utils.create_port(self._core_plugin,
+                                      context.elevated(), {'port': port_data})
 
         if not gw_port['fixed_ips']:
             LOG.debug('No IPs available for external network %s',
@@ -596,16 +597,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
                         port['port_id'], {'port':
                             {'fixed_ips': fixed_ips}}), [subnet], False
 
-        return self._core_plugin.create_port(context, {
-            'port':
-            {'tenant_id': subnet['tenant_id'],
-             'network_id': subnet['network_id'],
-             'fixed_ips': [fixed_ip],
-             'mac_address': attributes.ATTR_NOT_SPECIFIED,
-             'admin_state_up': True,
-             'device_id': router.id,
-             'device_owner': owner,
-             'name': ''}}), [subnet], True
+        port_data = {'tenant_id': subnet['tenant_id'],
+                     'network_id': subnet['network_id'],
+                     'fixed_ips': [fixed_ip],
+                     'admin_state_up': True,
+                     'device_id': router.id,
+                     'device_owner': owner,
+                     'name': ''}
+        return p_utils.create_port(self._core_plugin, context,
+                                   {'port': port_data}), [subnet], True
 
     @staticmethod
     def _make_router_interface_info(
@@ -956,14 +956,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
 
             port = {'tenant_id': '',  # tenant intentionally not set
                     'network_id': f_net_id,
-                    'mac_address': attributes.ATTR_NOT_SPECIFIED,
-                    'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
                     'admin_state_up': True,
                     'device_id': fip_id,
                     'device_owner': DEVICE_OWNER_FLOATINGIP,
                     'status': l3_constants.PORT_STATUS_NOTAPPLICABLE,
                     'name': ''}
-
             if fip.get('floating_ip_address'):
                 port['fixed_ips'] = [
                     {'ip_address': fip['floating_ip_address']}]
@@ -971,9 +968,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
             if fip.get('subnet_id'):
                 port['fixed_ips'] = [
                     {'subnet_id': fip['subnet_id']}]
-            external_port = self._core_plugin.create_port(context.elevated(),
-                                                          {'port': port})
 
+            # 'status' in port dict could not be updated by default, use
+            # check_allow_post to stop the verification of system
+            external_port = p_utils.create_port(self._core_plugin,
+                                                context.elevated(),
+                                                {'port': port},
+                                                check_allow_post=False)
             # Ensure IPv4 addresses are allocated on external port
             external_ipv4_ips = self._port_ipv4_fixed_ips(external_port)
             if not external_ipv4_ips:
index 9438ab04371ce19e65fcaa510551bc21520ef622..16f48c86f33d2d55e3a3d58064eb4e10e4487063 100644 (file)
@@ -35,6 +35,7 @@ from neutron.extensions import portbindings
 from neutron.i18n import _LI
 from neutron import manager
 from neutron.plugins.common import constants
+from neutron.plugins.common import utils as p_utils
 
 
 LOG = logging.getLogger(__name__)
@@ -563,17 +564,15 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
             if not f_port:
                 LOG.info(_LI('Agent Gateway port does not exist,'
                              ' so create one: %s'), f_port)
-                agent_port = self._core_plugin.create_port(
-                    context,
-                    {'port': {'tenant_id': '',
-                              'network_id': network_id,
-                              'mac_address': attributes.ATTR_NOT_SPECIFIED,
-                              'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
-                              'device_id': l3_agent_db['id'],
-                              'device_owner': DEVICE_OWNER_AGENT_GW,
-                              'binding:host_id': host,
-                              'admin_state_up': True,
-                              'name': ''}})
+                port_data = {'tenant_id': '',
+                             'network_id': network_id,
+                             'device_id': l3_agent_db['id'],
+                             'device_owner': DEVICE_OWNER_AGENT_GW,
+                             'binding:host_id': host,
+                             'admin_state_up': True,
+                             'name': ''}
+                agent_port = p_utils.create_port(self._core_plugin, context,
+                                                 {'port': port_data})
                 if agent_port:
                     self._populate_subnets_for_ports(context, [agent_port])
                     return agent_port
@@ -598,16 +597,15 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
     def _add_csnat_router_interface_port(
             self, context, router, network_id, subnet_id, do_pop=True):
         """Add SNAT interface to the specified router and subnet."""
-        snat_port = self._core_plugin.create_port(
-            context,
-            {'port': {'tenant_id': '',
-                      'network_id': network_id,
-                      'mac_address': attributes.ATTR_NOT_SPECIFIED,
-                      'fixed_ips': [{'subnet_id': subnet_id}],
-                      'device_id': router.id,
-                      'device_owner': DEVICE_OWNER_DVR_SNAT,
-                      'admin_state_up': True,
-                      'name': ''}})
+        port_data = {'tenant_id': '',
+                     'network_id': network_id,
+                     'fixed_ips': [{'subnet_id': subnet_id}],
+                     'device_id': router.id,
+                     'device_owner': DEVICE_OWNER_DVR_SNAT,
+                     'admin_state_up': True,
+                     'name': ''}
+        snat_port = p_utils.create_port(self._core_plugin, context,
+                                        {'port': port_data})
         if not snat_port:
             msg = _("Unable to create the SNAT Interface Port")
             raise n_exc.BadRequest(resource='router', msg=msg)
index 0d9b0bb396557083a4570197280ef35caea00324..7b286869e1a9c3080e2caefef734e6a867e1c7c0 100644 (file)
@@ -32,6 +32,8 @@ from neutron.extensions import l3_ext_ha_mode as l3_ha
 from neutron.extensions import portbindings
 from neutron.extensions import providernet
 from neutron.i18n import _LI
+from neutron.plugins.common import utils as p_utils
+
 
 VR_ID_RANGE = set(range(1, 255))
 MAX_ALLOCATION_TRIES = 10
@@ -219,18 +221,15 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
                 context, ha_network.network_id, router.id)
 
     def _create_ha_subnet(self, context, network_id, tenant_id):
-        args = {'subnet':
-                {'network_id': network_id,
-                 'tenant_id': '',
-                 'name': constants.HA_SUBNET_NAME % tenant_id,
-                 'ip_version': 4,
-                 'cidr': cfg.CONF.l3_ha_net_cidr,
-                 'enable_dhcp': False,
-                 'host_routes': attributes.ATTR_NOT_SPECIFIED,
-                 'dns_nameservers': attributes.ATTR_NOT_SPECIFIED,
-                 'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
-                 'gateway_ip': None}}
-        return self._core_plugin.create_subnet(context, args)
+        args = {'network_id': network_id,
+                'tenant_id': '',
+                'name': constants.HA_SUBNET_NAME % tenant_id,
+                'ip_version': 4,
+                'cidr': cfg.CONF.l3_ha_net_cidr,
+                'enable_dhcp': False,
+                'gateway_ip': None}
+        return p_utils.create_subnet(self._core_plugin, context,
+                                     {'subnet': args})
 
     def _create_ha_network_tenant_binding(self, context, tenant_id,
                                           network_id):
@@ -255,11 +254,10 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
                 {'name': constants.HA_NETWORK_NAME % tenant_id,
                  'tenant_id': '',
                  'shared': False,
-                 'admin_state_up': True,
-                 'status': constants.NET_STATUS_ACTIVE}}
+                 'admin_state_up': True}}
         self._add_ha_network_settings(args['network'])
+        network = p_utils.create_network(self._core_plugin, admin_ctx, args)
 
-        network = self._core_plugin.create_network(admin_ctx, args)
         try:
             ha_network = self._create_ha_network_tenant_binding(admin_ctx,
                                                                 tenant_id,
@@ -312,16 +310,14 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
         return portbinding
 
     def add_ha_port(self, context, router_id, network_id, tenant_id):
-        port = self._core_plugin.create_port(context, {
-            'port':
-            {'tenant_id': '',
-             'network_id': network_id,
-             'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
-             'mac_address': attributes.ATTR_NOT_SPECIFIED,
-             'admin_state_up': True,
-             'device_id': router_id,
-             'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
-             'name': constants.HA_PORT_NAME % tenant_id}})
+        args = {'tenant_id': '',
+                'network_id': network_id,
+                'admin_state_up': True,
+                'device_id': router_id,
+                'device_owner': constants.DEVICE_OWNER_ROUTER_HA_INTF,
+                'name': constants.HA_PORT_NAME % tenant_id}
+        port = p_utils.create_port(self._core_plugin, context,
+                                 {'port': args})
 
         try:
             return self._create_ha_port_binding(context, port['id'], router_id)
index 40ca2cffd3532cbec220f72dcfb243a784a25b61..287ea1a30013f539e790bbe0d97b36ba0dbf9862 100644 (file)
@@ -16,6 +16,9 @@
 Common utilities and helper functions for Openstack Networking Plugins.
 """
 
+import webob.exc
+
+from neutron.api.v2 import attributes
 from neutron.common import exceptions as n_exc
 from neutron.plugins.common import constants as p_const
 
@@ -96,3 +99,37 @@ def in_pending_status(status):
     return status in (p_const.PENDING_CREATE,
                       p_const.PENDING_UPDATE,
                       p_const.PENDING_DELETE)
+
+
+def _fixup_res_dict(context, attr_name, res_dict, check_allow_post=True):
+    attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[attr_name]
+    try:
+        attributes.populate_tenant_id(context, res_dict, attr_info, True)
+        attributes.verify_attributes(res_dict, attr_info)
+    except webob.exc.HTTPBadRequest as e:
+        # convert webob exception into ValueError as these functions are
+        # for internal use. webob exception doesn't make sense.
+        raise ValueError(e.detail)
+    attributes.fill_default_value(attr_info, res_dict,
+                                  check_allow_post=check_allow_post)
+    attributes.convert_value(attr_info, res_dict)
+    return res_dict
+
+
+def create_network(core_plugin, context, net):
+    net_data = _fixup_res_dict(context, attributes.NETWORKS,
+                               net.get('network', {}))
+    return core_plugin.create_network(context, {'network': net_data})
+
+
+def create_subnet(core_plugin, context, subnet):
+    subnet_data = _fixup_res_dict(context, attributes.SUBNETS,
+                                  subnet.get('subnet', {}))
+    return core_plugin.create_subnet(context, {'subnet': subnet_data})
+
+
+def create_port(core_plugin, context, port, check_allow_post=True):
+    port_data = _fixup_res_dict(context, attributes.PORTS,
+                                port.get('port', {}),
+                                check_allow_post=check_allow_post)
+    return core_plugin.create_port(context, {'port': port_data})
index a06fd2a0dd5ec6bb3f44acbdfa49bd0e5f5aefdb..d57632139f6ecf253eafbda7ae0849e8d94066e0 100644 (file)
@@ -36,6 +36,8 @@ class TestDhcpRpcCallback(base.BaseTestCase):
         set_dirty_p = mock.patch('neutron.quota.resource_registry.'
                                  'set_resources_dirty')
         self.mock_set_dirty = set_dirty_p.start()
+        self.utils_p = mock.patch('neutron.plugins.common.utils.create_port')
+        self.utils = self.utils_p.start()
 
     def test_get_active_networks(self):
         plugin_retval = [dict(id='a'), dict(id='b')]
@@ -79,6 +81,7 @@ class TestDhcpRpcCallback(base.BaseTestCase):
             'fixed_ips': [{'subnet_id': 'foo_subnet_id'}]
         }
         self.plugin.create_port.side_effect = exc
+        self.utils.side_effect = exc
         self.assertIsNone(self.callbacks._port_action(self.plugin,
                                                       mock.Mock(),
                                                       {'port': port},
@@ -87,7 +90,10 @@ class TestDhcpRpcCallback(base.BaseTestCase):
     def _test__port_action_good_action(self, action, port, expected_call):
         self.callbacks._port_action(self.plugin, mock.Mock(),
                                     port, action)
-        self.plugin.assert_has_calls([expected_call])
+        if action == 'create_port':
+            self.utils.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
+        else:
+            self.plugin.assert_has_calls([expected_call])
 
     def test_port_action_create_port(self):
         self._test__port_action_good_action(
index 512fc3022e7e861d69709beff69eb568d83b55b0..7b03a91d35f3a35254b9e38d7d8b342b925e4dc2 100644 (file)
@@ -878,3 +878,90 @@ class TestConvertToList(base.BaseTestCase):
     def test_convert_to_list_non_iterable(self):
         for item in (True, False, 1, 1.2, object()):
             self.assertEqual([item], attributes.convert_to_list(item))
+
+
+class TestResDict(base.BaseTestCase):
+    class _MyException(Exception):
+        pass
+    _EXC_CLS = _MyException
+
+    def _test_fill_default_value(self, attr_info, expected, res_dict):
+        attributes.fill_default_value(attr_info, res_dict)
+        self.assertEqual(expected, res_dict)
+
+    def test_fill_default_value(self):
+        attr_info = {
+            'key': {
+                'allow_post': True,
+                'default': attributes.ATTR_NOT_SPECIFIED,
+            },
+        }
+        self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
+        self._test_fill_default_value(
+            attr_info, {'key': attributes.ATTR_NOT_SPECIFIED}, {})
+
+        attr_info = {
+            'key': {
+                'allow_post': True,
+            },
+        }
+        self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
+        self.assertRaises(ValueError, self._test_fill_default_value,
+                          attr_info, {'key': 'X'}, {})
+        self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
+                          attr_info, {}, self._EXC_CLS)
+        attr_info = {
+            'key': {
+                'allow_post': False,
+            },
+        }
+        self.assertRaises(ValueError, self._test_fill_default_value,
+                          attr_info, {'key': 'X'}, {'key': 'X'})
+        self._test_fill_default_value(attr_info, {}, {})
+        self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
+                          attr_info, {'key': 'X'}, self._EXC_CLS)
+
+    def _test_convert_value(self, attr_info, expected, res_dict):
+        attributes.convert_value(attr_info, res_dict)
+        self.assertEqual(expected, res_dict)
+
+    def test_convert_value(self):
+        attr_info = {
+            'key': {
+            },
+        }
+        self._test_convert_value(attr_info,
+                                 {'key': attributes.ATTR_NOT_SPECIFIED},
+                                 {'key': attributes.ATTR_NOT_SPECIFIED})
+        self._test_convert_value(attr_info, {'key': 'X'}, {'key': 'X'})
+        self._test_convert_value(attr_info,
+                                 {'other_key': 'X'}, {'other_key': 'X'})
+
+        attr_info = {
+            'key': {
+                'convert_to': attributes.convert_to_int,
+            },
+        }
+        self._test_convert_value(attr_info,
+                                 {'key': attributes.ATTR_NOT_SPECIFIED},
+                                 {'key': attributes.ATTR_NOT_SPECIFIED})
+        self._test_convert_value(attr_info, {'key': 1}, {'key': '1'})
+        self._test_convert_value(attr_info, {'key': 1}, {'key': 1})
+        self.assertRaises(n_exc.InvalidInput, self._test_convert_value,
+                          attr_info, {'key': 1}, {'key': 'a'})
+
+        attr_info = {
+            'key': {
+                'validate': {'type:uuid': None},
+            },
+        }
+        self._test_convert_value(attr_info,
+                                 {'key': attributes.ATTR_NOT_SPECIFIED},
+                                 {'key': attributes.ATTR_NOT_SPECIFIED})
+        uuid_str = '01234567-1234-1234-1234-1234567890ab'
+        self._test_convert_value(attr_info,
+                                 {'key': uuid_str}, {'key': uuid_str})
+        self.assertRaises(ValueError, self._test_convert_value,
+                          attr_info, {'key': 1}, {'key': 1})
+        self.assertRaises(self._EXC_CLS, attributes.convert_value,
+                          attr_info, {'key': 1}, self._EXC_CLS)