From: Moshe Levi Date: Thu, 9 Jul 2015 10:21:49 +0000 (+0300) Subject: Add versioned object serialize/deserialize for resources RPC X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=d2259240bb06f2e1d82465d3ddc0ee7073795087;p=openstack-build%2Fneutron-build.git Add versioned object serialize/deserialize for resources RPC Also switched RPC callback API to consistently receive resource_type string and not a resource class. This is because for get_info(), we cannot propagate a class thru RPC but only a string that uniquely identifies the class. So it would be not optimal to require the server to discover the corresponding class from the type name passed from the agent. Also updated some comments in api/rpc/callbacks directory to reflect that we handle NeutronObjects, not dicts. Finally, killed the rule resource registration from QoS plugin and the rule type from supported resources since it's YAGNI at least now. Partially-Implements: blueprint quantum-qos-api Change-Id: I5929338953a2ad7fa68312d79394a306eb0164a2 --- diff --git a/neutron/agent/l2/extensions/qos_agent.py b/neutron/agent/l2/extensions/qos_agent.py index d39c60041..16f2e8762 100644 --- a/neutron/agent/l2/extensions/qos_agent.py +++ b/neutron/agent/l2/extensions/qos_agent.py @@ -109,16 +109,18 @@ class QosAgentExtension(agent_extension.AgentCoreResourceExtension): #TODO(QoS): handle updates when implemented # we have two options: # 1. to add new api for subscribe - # registry.subscribe(self._process_rules_updates, - # resources.QOS_RULES, qos_policy_id) + # registry.subscribe(self._process_policy_updates, + # resources.QOS_POLICY, qos_policy_id) # 2. combine get_info rpc to also subscribe to the resource - qos_rules = self.resource_rpc.get_info( - context, resources.QOS_POLICY, qos_policy_id) - self._process_rules_updates( + qos_policy = self.resource_rpc.get_info( + context, + resources.QOS_POLICY, + qos_policy_id) + self._process_policy_updates( port, resources.QOS_POLICY, qos_policy_id, - qos_rules, 'create') + qos_policy, 'create') - def _process_rules_updates( + def _process_policy_updates( self, port, resource_type, resource_id, - qos_rules, action_type): - getattr(self.qos_driver, action_type)(port, qos_rules) + qos_policy, action_type): + getattr(self.qos_driver, action_type)(port, qos_policy) diff --git a/neutron/api/rpc/callbacks/registry.py b/neutron/api/rpc/callbacks/registry.py index fcf663e5d..931cce20b 100644 --- a/neutron/api/rpc/callbacks/registry.py +++ b/neutron/api/rpc/callbacks/registry.py @@ -27,10 +27,10 @@ def _get_resources_callback_manager(): def get_info(resource_type, resource_id, **kwargs): """Get information about resource type with resource id. - The function will check the providers for an specific remotable + The function will check the providers for a specific remotable resource and get the resource. - :returns: an oslo versioned object. + :returns: NeutronObject """ callback = _get_resources_callback_manager().get_callback(resource_type) if callback: diff --git a/neutron/api/rpc/callbacks/resource_manager.py b/neutron/api/rpc/callbacks/resource_manager.py index 02e940f93..f28326fef 100644 --- a/neutron/api/rpc/callbacks/resource_manager.py +++ b/neutron/api/rpc/callbacks/resource_manager.py @@ -27,43 +27,41 @@ class ResourcesCallbacksManager(object): def __init__(self): self.clear() - def register(self, callback, resource): - """register callback for a resource . + def register(self, callback, resource_type): + """Register a callback for a resource type. - One callback can be register to a resource + Only one callback can be registered for a resource type. - :param callback: the callback. It must raise or return a dict. - :param resource: the resource. It must be a valid resource. + :param callback: the callback. It must raise or return NeutronObject. + :param resource_type: must be a valid resource type. """ - LOG.debug("register: %(callback)s %(resource)s", - {'callback': callback, 'resource': resource}) - if resource not in resources.VALID: - raise exceptions.Invalid(element='resource', value=resource) + LOG.debug("register: %(callback)s %(resource_type)s", + {'callback': callback, 'resource_type': resource_type}) + if not resources.is_valid_resource_type(resource_type): + raise exceptions.Invalid(element='resource', value=resource_type) - self._callbacks[resource] = callback + self._callbacks[resource_type] = callback - def unregister(self, resource): + def unregister(self, resource_type): """Unregister callback from the registry. - :param callback: the callback. - :param resource: the resource. + :param resource: must be a valid resource type. """ - LOG.debug("Unregister: %(resource)s", - {'resource': resource}) - if resource not in resources.VALID: - raise exceptions.Invalid(element='resource', value=resource) - self._callbacks[resource] = None + LOG.debug("Unregister: %s", resource_type) + if not resources.is_valid_resource_type(resource_type): + raise exceptions.Invalid(element='resource', value=resource_type) + self._callbacks[resource_type] = None def clear(self): - """Brings the manager to a clean slate.""" + """Brings the manager to a clean state.""" self._callbacks = collections.defaultdict(dict) - def get_callback(self, resource): + def get_callback(self, resource_type): """Return the callback if found, None otherwise. - :param resource: the resource. It must be a valid resource. + :param resource_type: must be a valid resource type. """ - if resource not in resources.VALID: - raise exceptions.Invalid(element='resource', value=resource) + if not resources.is_valid_resource_type(resource_type): + raise exceptions.Invalid(element='resource', value=resource_type) - return self._callbacks[resource] + return self._callbacks[resource_type] diff --git a/neutron/api/rpc/callbacks/resources.py b/neutron/api/rpc/callbacks/resources.py index 027dde2a1..bde7aed9a 100644 --- a/neutron/api/rpc/callbacks/resources.py +++ b/neutron/api/rpc/callbacks/resources.py @@ -10,10 +10,40 @@ # License for the specific language governing permissions and limitations # under the License. -QOS_POLICY = 'qos-policy' -QOS_RULE = 'qos-rule' +from neutron.objects.qos import policy -VALID = ( - QOS_POLICY, - QOS_RULE, + +_QOS_POLICY_CLS = policy.QosPolicy + +_VALID_CLS = ( + _QOS_POLICY_CLS, ) + +_VALID_TYPES = [cls.obj_name() for cls in _VALID_CLS] + + +# Supported types +QOS_POLICY = _QOS_POLICY_CLS.obj_name() + + +_TYPE_TO_CLS_MAP = { + QOS_POLICY: _QOS_POLICY_CLS, +} + + +def get_resource_type(resource_cls): + if not resource_cls: + return None + + if not hasattr(resource_cls, 'obj_name'): + return None + + return resource_cls.obj_name() + + +def is_valid_resource_type(resource_type): + return resource_type in _VALID_TYPES + + +def get_resource_cls(resource_type): + return _TYPE_TO_CLS_MAP.get(resource_type) diff --git a/neutron/api/rpc/handlers/resources_rpc.py b/neutron/api/rpc/handlers/resources_rpc.py index 68ebc6580..d2869fe86 100755 --- a/neutron/api/rpc/handlers/resources_rpc.py +++ b/neutron/api/rpc/handlers/resources_rpc.py @@ -18,7 +18,9 @@ from oslo_log import log as logging import oslo_messaging from neutron.api.rpc.callbacks import registry +from neutron.api.rpc.callbacks import resources from neutron.common import constants +from neutron.common import exceptions from neutron.common import rpc as n_rpc from neutron.common import topics @@ -26,12 +28,30 @@ from neutron.common import topics LOG = logging.getLogger(__name__) +class ResourcesRpcError(exceptions.NeutronException): + pass + + +class InvalidResourceTypeClass(ResourcesRpcError): + message = _("Invalid resource type %(resource_type)s") + + +class ResourceNotFound(ResourcesRpcError): + message = _("Resource %(resource_id)s of type %(resource_type)s " + "not found") + + +def _validate_resource_type(resource_type): + if not resources.is_valid_resource_type(resource_type): + raise InvalidResourceTypeClass(resource_type=resource_type) + + class ResourcesServerRpcApi(object): """Agent-side RPC (stub) for agent-to-plugin interaction. This class implements the client side of an rpc interface. The server side can be found below: ResourcesServerRpcCallback. For more information on - changing rpc interfaces, see doc/source/devref/rpc_api.rst. + this RPC interface, see doc/source/devref/rpc_callbacks.rst. """ def __init__(self): @@ -42,10 +62,24 @@ class ResourcesServerRpcApi(object): @log_helpers.log_method_call def get_info(self, context, resource_type, resource_id): + _validate_resource_type(resource_type) + + # we've already validated the resource type, so we are pretty sure the + # class is there => no need to validate it specifically + resource_type_cls = resources.get_resource_cls(resource_type) + cctxt = self.client.prepare() - #TODO(Qos): add deserialize version object - return cctxt.call(context, 'get_info', - resource_type=resource_type, resource_id=resource_id) + primitive = cctxt.call(context, 'get_info', + resource_type=resource_type, + version=resource_type_cls.VERSION, resource_id=resource_id) + + if primitive is None: + raise ResourceNotFound(resource_type=resource_type, + resource_id=resource_id) + + obj = resource_type_cls.obj_from_primitive(primitive) + obj.obj_reset_changes() + return obj class ResourcesServerRpcCallback(object): @@ -53,7 +87,7 @@ class ResourcesServerRpcCallback(object): This class implements the server side of an rpc interface. The client side can be found above: ResourcesServerRpcApi. For more information on - changing rpc interfaces, see doc/source/devref/rpc_api.rst. + this RPC interface, see doc/source/devref/rpc_callbacks.rst. """ # History @@ -62,10 +96,13 @@ class ResourcesServerRpcCallback(object): target = oslo_messaging.Target( version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES) - def get_info(self, context, resource_type, resource_id): - kwargs = {'context': context} - #TODO(Qos): add serialize version object - return registry.get_info( + def get_info(self, context, resource_type, version, resource_id): + _validate_resource_type(resource_type) + + obj = registry.get_info( resource_type, resource_id, - **kwargs) + context=context) + + if obj: + return obj.obj_to_primitive(target_version=version) diff --git a/neutron/services/qos/qos_plugin.py b/neutron/services/qos/qos_plugin.py index f1d9a1470..ac0e360a4 100644 --- a/neutron/services/qos/qos_plugin.py +++ b/neutron/services/qos/qos_plugin.py @@ -30,33 +30,14 @@ from oslo_log import log as logging LOG = logging.getLogger(__name__) -#TODO(QoS): remove this stub when db is ready -def _get_qos_policy_cb_stub(resource, policy_id, **kwargs): - """Hardcoded stub for testing until we get the db working.""" - qos_policy = { - "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04", - "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4", - "name": "10Mbit", - "description": "This policy limits the ports to 10Mbit max.", - "shared": False, - "rules": [{ - "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793", - "max_kbps": "10000", - "max_burst_kbps": "0", - "type": "bandwidth_limit" - }] - } - return qos_policy - - -def _get_qos_policy_cb(resource, policy_id, **kwargs): +def _get_qos_policy_cb(resource_type, policy_id, **kwargs): qos_plugin = manager.NeutronManager.get_service_plugins().get( constants.QOS) context = kwargs.get('context') if context is None: LOG.warning(_LW( - 'Received %(resource)s %(policy_id)s without context'), - {'resource': resource, 'policy_id': policy_id} + 'Received %(resource_type)s %(policy_id)s without context'), + {'resource_type': resource_type, 'policy_id': policy_id} ) return @@ -64,35 +45,6 @@ def _get_qos_policy_cb(resource, policy_id, **kwargs): return qos_policy -#TODO(QoS): remove this stub when db is ready -def _get_qos_bandwidth_limit_rule_cb_stub(resource, rule_id, **kwargs): - """Hardcoded for testing until we get the db working.""" - bandwidth_limit = { - "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793", - "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4", - "max_kbps": "10000", - "max_burst_kbps": "0", - } - return bandwidth_limit - - -def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs): - qos_plugin = manager.NeutronManager.get_service_plugins().get( - constants.QOS) - context = kwargs.get('context') - if context is None: - LOG.warning(_LW( - 'Received %(resource)s %(rule_id,)s without context '), - {'resource': resource, 'rule_id,': rule_id} - ) - return - - bandwidth_limit = qos_plugin.get_qos_bandwidth_limit_rule( - context, - rule_id) - return bandwidth_limit - - class QoSPlugin(qos.QoSPluginBase): """Implementation of the Neutron QoS Service Plugin. @@ -105,15 +57,8 @@ class QoSPlugin(qos.QoSPluginBase): def __init__(self): super(QoSPlugin, self).__init__() - self.register_resource_providers() - - def register_resource_providers(self): - rpc_registry.register_provider( - _get_qos_bandwidth_limit_rule_cb_stub, - rpc_resources.QOS_RULE) - rpc_registry.register_provider( - _get_qos_policy_cb_stub, + _get_qos_policy_cb, rpc_resources.QOS_POLICY) def create_policy(self, context, policy): diff --git a/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py b/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py index f68e02da7..7e9f58898 100644 --- a/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py +++ b/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py @@ -44,20 +44,6 @@ class ResourcesCallbackRequestTestCase(base.BaseTestCase): } return qos_policy - #TODO(QoS) convert it to the version object format - def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs): - bandwidth_limit = { - "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793", - "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4", - "max_kbps": "10000", - "max_burst_kbps": "0", - } - return bandwidth_limit - - rpc_registry.register_provider( - _get_qos_bandwidth_limit_rule_cb, - resources.QOS_RULE) - rpc_registry.register_provider( _get_qos_policy_cb, resources.QOS_POLICY) @@ -70,9 +56,3 @@ class ResourcesCallbackRequestTestCase(base.BaseTestCase): self.resource_id, **kwargs) self.assertEqual(self.resource_id, qos_policy['id']) - - qos_rule = rpc_registry.get_info( - resources.QOS_RULE, - self.qos_rule_id, - **kwargs) - self.assertEqual(self.qos_rule_id, qos_rule['id']) diff --git a/neutron/tests/unit/api/rpc/callbacks/test_resources.py b/neutron/tests/unit/api/rpc/callbacks/test_resources.py new file mode 100644 index 000000000..78d8e5d82 --- /dev/null +++ b/neutron/tests/unit/api/rpc/callbacks/test_resources.py @@ -0,0 +1,54 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.api.rpc.callbacks import resources +from neutron.objects.qos import policy +from neutron.tests import base + + +class GetResourceTypeTestCase(base.BaseTestCase): + + def test_get_resource_type_none(self): + self.assertIsNone(resources.get_resource_type(None)) + + def test_get_resource_type_wrong_type(self): + self.assertIsNone(resources.get_resource_type(object())) + + def test_get_resource_type(self): + # we could use any other registered NeutronObject type here + self.assertEqual(policy.QosPolicy.obj_name(), + resources.get_resource_type(policy.QosPolicy())) + + +class IsValidResourceTypeTestCase(base.BaseTestCase): + + def test_known_type(self): + # it could be any other NeutronObject, assuming it's known to RPC + # callbacks + self.assertTrue(resources.is_valid_resource_type( + policy.QosPolicy.obj_name())) + + def test_unknown_type(self): + self.assertFalse( + resources.is_valid_resource_type('unknown-resource-type')) + + +class GetResourceClsTestCase(base.BaseTestCase): + + def test_known_type(self): + # it could be any other NeutronObject, assuming it's known to RPC + # callbacks + self.assertEqual(policy.QosPolicy, + resources.get_resource_cls(resources.QOS_POLICY)) + + def test_unknown_type(self): + self.assertIsNone(resources.get_resource_cls('unknown-resource-type')) diff --git a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py new file mode 100755 index 000000000..347c2a3d0 --- /dev/null +++ b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py @@ -0,0 +1,101 @@ +# Copyright (c) 2015 Mellanox Technologies, Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from oslo_utils import uuidutils + +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron import context +from neutron.objects.qos import policy +from neutron.tests import base + + +class ResourcesRpcBaseTestCase(base.BaseTestCase): + + def setUp(self): + super(ResourcesRpcBaseTestCase, self).setUp() + self.context = context.get_admin_context() + + def _create_test_policy_dict(self): + return {'id': uuidutils.generate_uuid(), + 'tenant_id': uuidutils.generate_uuid(), + 'name': 'test', + 'description': 'test', + 'shared': False} + + def _create_test_policy(self, policy_dict): + policy_obj = policy.QosPolicy(self.context, **policy_dict) + policy_obj.obj_reset_changes() + return policy_obj + + +class ResourcesServerRpcApiTestCase(ResourcesRpcBaseTestCase): + + def setUp(self): + super(ResourcesServerRpcApiTestCase, self).setUp() + self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client') + self.client = self.client_p.start() + self.rpc = resources_rpc.ResourcesServerRpcApi() + self.mock_cctxt = self.rpc.client.prepare.return_value + + def test_get_info(self): + policy_dict = self._create_test_policy_dict() + expected_policy_obj = self._create_test_policy(policy_dict) + qos_policy_id = policy_dict['id'] + self.mock_cctxt.call.return_value = ( + expected_policy_obj.obj_to_primitive()) + get_info_result = self.rpc.get_info( + self.context, resources.QOS_POLICY, qos_policy_id) + self.mock_cctxt.call.assert_called_once_with( + self.context, 'get_info', resource_type=resources.QOS_POLICY, + version=policy.QosPolicy.VERSION, resource_id=qos_policy_id) + self.assertEqual(expected_policy_obj, get_info_result) + + def test_get_info_invalid_resource_type_cls(self): + self.assertRaises( + resources_rpc.InvalidResourceTypeClass, self.rpc.get_info, + self.context, 'foo_type', 'foo_id') + + def test_get_info_resource_not_found(self): + policy_dict = self._create_test_policy_dict() + qos_policy_id = policy_dict['id'] + self.mock_cctxt.call.return_value = None + self.assertRaises( + resources_rpc.ResourceNotFound, self.rpc.get_info, self.context, + resources.QOS_POLICY, qos_policy_id) + + +class ResourcesServerRpcCallbackTestCase(ResourcesRpcBaseTestCase): + + def setUp(self): + super(ResourcesServerRpcCallbackTestCase, self).setUp() + self.callbacks = resources_rpc.ResourcesServerRpcCallback() + + def test_get_info(self): + policy_dict = self._create_test_policy_dict() + policy_obj = self._create_test_policy(policy_dict) + qos_policy_id = policy_dict['id'] + with mock.patch.object(resources_rpc.registry, 'get_info', + return_value=policy_obj) as registry_mock: + primitive = self.callbacks.get_info( + self.context, resource_type=resources.QOS_POLICY, + version=policy.QosPolicy.VERSION, + resource_id=qos_policy_id) + registry_mock.assert_called_once_with( + resources.QOS_POLICY, + qos_policy_id, context=self.context) + self.assertEqual(policy_dict, primitive['versioned_object.data']) + self.assertEqual(policy_obj.obj_to_primitive(), primitive)