]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add versioned object serialize/deserialize for resources RPC
authorMoshe Levi <moshele@mellanox.com>
Thu, 9 Jul 2015 10:21:49 +0000 (13:21 +0300)
committerIhar Hrachyshka <ihrachys@redhat.com>
Sat, 25 Jul 2015 06:51:39 +0000 (08:51 +0200)
Also switched RPC callback API to consistently receive resource_type
string and not a resource class. This is because for get_info(), we
cannot propagate a class thru RPC but only a string that uniquely
identifies the class. So it would be not optimal to require the server
to discover the corresponding class from the type name passed from the
agent.

Also updated some comments in api/rpc/callbacks directory to reflect
that we handle NeutronObjects, not dicts.

Finally, killed the rule resource registration from QoS plugin and the
rule type from supported resources since it's YAGNI at least now.

Partially-Implements: blueprint quantum-qos-api
Change-Id: I5929338953a2ad7fa68312d79394a306eb0164a2

neutron/agent/l2/extensions/qos_agent.py
neutron/api/rpc/callbacks/registry.py
neutron/api/rpc/callbacks/resource_manager.py
neutron/api/rpc/callbacks/resources.py
neutron/api/rpc/handlers/resources_rpc.py
neutron/services/qos/qos_plugin.py
neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py
neutron/tests/unit/api/rpc/callbacks/test_resources.py [new file with mode: 0644]
neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py [new file with mode: 0755]

index d39c60041acfacae875f77c92d58aec9624720b5..16f2e8762276d28752158ff34390a333746d7be2 100644 (file)
@@ -109,16 +109,18 @@ class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
         #TODO(QoS): handle updates when implemented
         # we have two options:
         # 1. to add new api for subscribe
-        #    registry.subscribe(self._process_rules_updates,
-        #                   resources.QOS_RULES, qos_policy_id)
+        #    registry.subscribe(self._process_policy_updates,
+        #                   resources.QOS_POLICY, qos_policy_id)
         # 2. combine get_info rpc to also subscribe to the resource
-        qos_rules = self.resource_rpc.get_info(
-            context, resources.QOS_POLICY, qos_policy_id)
-        self._process_rules_updates(
+        qos_policy = self.resource_rpc.get_info(
+            context,
+            resources.QOS_POLICY,
+            qos_policy_id)
+        self._process_policy_updates(
             port, resources.QOS_POLICY, qos_policy_id,
-            qos_rules, 'create')
+            qos_policy, 'create')
 
-    def _process_rules_updates(
+    def _process_policy_updates(
             self, port, resource_type, resource_id,
-            qos_rules, action_type):
-        getattr(self.qos_driver, action_type)(port, qos_rules)
+            qos_policy, action_type):
+        getattr(self.qos_driver, action_type)(port, qos_policy)
index fcf663e5d765bb59a36d4932638eaab0638cfb25..931cce20be6c188e0f8e5109505b491ef672fdfa 100644 (file)
@@ -27,10 +27,10 @@ def _get_resources_callback_manager():
 def get_info(resource_type, resource_id, **kwargs):
     """Get information about resource type with resource id.
 
-    The function will check the providers for an specific remotable
+    The function will check the providers for a specific remotable
     resource and get the resource.
 
-    :returns: an oslo versioned object.
+    :returns: NeutronObject
     """
     callback = _get_resources_callback_manager().get_callback(resource_type)
     if callback:
index 02e940f93e3cebcfe3c717c0aab8f997b95473fc..f28326fef7242e295f3567ddad4ba414f0ec07bb 100644 (file)
@@ -27,43 +27,41 @@ class ResourcesCallbacksManager(object):
     def __init__(self):
         self.clear()
 
-    def register(self, callback, resource):
-        """register callback for a resource .
+    def register(self, callback, resource_type):
+        """Register a callback for a resource type.
 
-        One callback can be register to a resource
+        Only one callback can be registered for a resource type.
 
-        :param callback: the callback. It must raise or return a dict.
-        :param resource: the resource. It must be a valid resource.
+        :param callback: the callback. It must raise or return NeutronObject.
+        :param resource_type: must be a valid resource type.
         """
-        LOG.debug("register: %(callback)s %(resource)s",
-                  {'callback': callback, 'resource': resource})
-        if resource not in resources.VALID:
-            raise exceptions.Invalid(element='resource', value=resource)
+        LOG.debug("register: %(callback)s %(resource_type)s",
+                  {'callback': callback, 'resource_type': resource_type})
+        if not resources.is_valid_resource_type(resource_type):
+            raise exceptions.Invalid(element='resource', value=resource_type)
 
-        self._callbacks[resource] = callback
+        self._callbacks[resource_type] = callback
 
-    def unregister(self, resource):
+    def unregister(self, resource_type):
         """Unregister callback from the registry.
 
-        :param callback: the callback.
-        :param resource: the resource.
+        :param resource: must be a valid resource type.
         """
-        LOG.debug("Unregister: %(resource)s",
-                  {'resource': resource})
-        if resource not in resources.VALID:
-            raise exceptions.Invalid(element='resource', value=resource)
-        self._callbacks[resource] = None
+        LOG.debug("Unregister: %s", resource_type)
+        if not resources.is_valid_resource_type(resource_type):
+            raise exceptions.Invalid(element='resource', value=resource_type)
+        self._callbacks[resource_type] = None
 
     def clear(self):
-        """Brings the manager to a clean slate."""
+        """Brings the manager to a clean state."""
         self._callbacks = collections.defaultdict(dict)
 
-    def get_callback(self, resource):
+    def get_callback(self, resource_type):
         """Return the callback if found, None otherwise.
 
-        :param resource: the resource. It must be a valid resource.
+        :param resource_type: must be a valid resource type.
         """
-        if resource not in resources.VALID:
-            raise exceptions.Invalid(element='resource', value=resource)
+        if not resources.is_valid_resource_type(resource_type):
+            raise exceptions.Invalid(element='resource', value=resource_type)
 
-        return self._callbacks[resource]
+        return self._callbacks[resource_type]
index 027dde2a16affb42dafbabfcda92bc756e7861cc..bde7aed9a7e499a66e8d13352c0f0a08a19bd849 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-QOS_POLICY = 'qos-policy'
-QOS_RULE = 'qos-rule'
+from neutron.objects.qos import policy
 
-VALID = (
-    QOS_POLICY,
-    QOS_RULE,
+
+_QOS_POLICY_CLS = policy.QosPolicy
+
+_VALID_CLS = (
+    _QOS_POLICY_CLS,
 )
+
+_VALID_TYPES = [cls.obj_name() for cls in _VALID_CLS]
+
+
+# Supported types
+QOS_POLICY = _QOS_POLICY_CLS.obj_name()
+
+
+_TYPE_TO_CLS_MAP = {
+    QOS_POLICY: _QOS_POLICY_CLS,
+}
+
+
+def get_resource_type(resource_cls):
+    if not resource_cls:
+        return None
+
+    if not hasattr(resource_cls, 'obj_name'):
+        return None
+
+    return resource_cls.obj_name()
+
+
+def is_valid_resource_type(resource_type):
+    return resource_type in _VALID_TYPES
+
+
+def get_resource_cls(resource_type):
+    return _TYPE_TO_CLS_MAP.get(resource_type)
index 68ebc6580d302f058773931b5503801e7db04eda..d2869fe86759d91a09951f7b15fb88f7736e2f9e 100755 (executable)
@@ -18,7 +18,9 @@ from oslo_log import log as logging
 import oslo_messaging
 
 from neutron.api.rpc.callbacks import registry
+from neutron.api.rpc.callbacks import resources
 from neutron.common import constants
+from neutron.common import exceptions
 from neutron.common import rpc as n_rpc
 from neutron.common import topics
 
@@ -26,12 +28,30 @@ from neutron.common import topics
 LOG = logging.getLogger(__name__)
 
 
+class ResourcesRpcError(exceptions.NeutronException):
+    pass
+
+
+class InvalidResourceTypeClass(ResourcesRpcError):
+    message = _("Invalid resource type %(resource_type)s")
+
+
+class ResourceNotFound(ResourcesRpcError):
+    message = _("Resource %(resource_id)s of type %(resource_type)s "
+                "not found")
+
+
+def _validate_resource_type(resource_type):
+    if not resources.is_valid_resource_type(resource_type):
+        raise InvalidResourceTypeClass(resource_type=resource_type)
+
+
 class ResourcesServerRpcApi(object):
     """Agent-side RPC (stub) for agent-to-plugin interaction.
 
     This class implements the client side of an rpc interface.  The server side
     can be found below: ResourcesServerRpcCallback.  For more information on
-    changing rpc interfaces, see doc/source/devref/rpc_api.rst.
+    this RPC interface, see doc/source/devref/rpc_callbacks.rst.
     """
 
     def __init__(self):
@@ -42,10 +62,24 @@ class ResourcesServerRpcApi(object):
 
     @log_helpers.log_method_call
     def get_info(self, context, resource_type, resource_id):
+        _validate_resource_type(resource_type)
+
+        # we've already validated the resource type, so we are pretty sure the
+        # class is there => no need to validate it specifically
+        resource_type_cls = resources.get_resource_cls(resource_type)
+
         cctxt = self.client.prepare()
-        #TODO(Qos): add deserialize version object
-        return cctxt.call(context, 'get_info',
-            resource_type=resource_type, resource_id=resource_id)
+        primitive = cctxt.call(context, 'get_info',
+            resource_type=resource_type,
+            version=resource_type_cls.VERSION, resource_id=resource_id)
+
+        if primitive is None:
+            raise ResourceNotFound(resource_type=resource_type,
+                                   resource_id=resource_id)
+
+        obj = resource_type_cls.obj_from_primitive(primitive)
+        obj.obj_reset_changes()
+        return obj
 
 
 class ResourcesServerRpcCallback(object):
@@ -53,7 +87,7 @@ class ResourcesServerRpcCallback(object):
 
     This class implements the server side of an rpc interface.  The client side
     can be found above: ResourcesServerRpcApi.  For more information on
-    changing rpc interfaces, see doc/source/devref/rpc_api.rst.
+    this RPC interface, see doc/source/devref/rpc_callbacks.rst.
     """
 
     # History
@@ -62,10 +96,13 @@ class ResourcesServerRpcCallback(object):
     target = oslo_messaging.Target(
         version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
 
-    def get_info(self, context, resource_type, resource_id):
-        kwargs = {'context': context}
-        #TODO(Qos): add serialize  version object
-        return registry.get_info(
+    def get_info(self, context, resource_type, version, resource_id):
+        _validate_resource_type(resource_type)
+
+        obj = registry.get_info(
             resource_type,
             resource_id,
-            **kwargs)
+            context=context)
+
+        if obj:
+            return obj.obj_to_primitive(target_version=version)
index f1d9a147021ead5807bb543f45cbee7162ad3d2f..ac0e360a4c71a45a6f9a1abc637f7d4c67e40e67 100644 (file)
@@ -30,33 +30,14 @@ from oslo_log import log as logging
 LOG = logging.getLogger(__name__)
 
 
-#TODO(QoS): remove this stub when db is ready
-def _get_qos_policy_cb_stub(resource, policy_id, **kwargs):
-    """Hardcoded stub for testing until we get the db working."""
-    qos_policy = {
-        "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
-        "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
-        "name": "10Mbit",
-        "description": "This policy limits the ports to 10Mbit max.",
-        "shared": False,
-        "rules": [{
-            "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
-            "max_kbps": "10000",
-            "max_burst_kbps": "0",
-            "type": "bandwidth_limit"
-        }]
-    }
-    return qos_policy
-
-
-def _get_qos_policy_cb(resource, policy_id, **kwargs):
+def _get_qos_policy_cb(resource_type, policy_id, **kwargs):
     qos_plugin = manager.NeutronManager.get_service_plugins().get(
         constants.QOS)
     context = kwargs.get('context')
     if context is None:
         LOG.warning(_LW(
-            'Received %(resource)s %(policy_id)s without context'),
-            {'resource': resource, 'policy_id': policy_id}
+            'Received %(resource_type)s %(policy_id)s without context'),
+            {'resource_type': resource_type, 'policy_id': policy_id}
         )
         return
 
@@ -64,35 +45,6 @@ def _get_qos_policy_cb(resource, policy_id, **kwargs):
     return qos_policy
 
 
-#TODO(QoS): remove this stub when db is ready
-def _get_qos_bandwidth_limit_rule_cb_stub(resource, rule_id, **kwargs):
-    """Hardcoded for testing until we get the db working."""
-    bandwidth_limit = {
-        "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
-        "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
-        "max_kbps": "10000",
-        "max_burst_kbps": "0",
-    }
-    return bandwidth_limit
-
-
-def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
-    qos_plugin = manager.NeutronManager.get_service_plugins().get(
-        constants.QOS)
-    context = kwargs.get('context')
-    if context is None:
-        LOG.warning(_LW(
-            'Received %(resource)s %(rule_id,)s without context '),
-            {'resource': resource, 'rule_id,': rule_id}
-        )
-        return
-
-    bandwidth_limit = qos_plugin.get_qos_bandwidth_limit_rule(
-                                        context,
-                                        rule_id)
-    return bandwidth_limit
-
-
 class QoSPlugin(qos.QoSPluginBase):
     """Implementation of the Neutron QoS Service Plugin.
 
@@ -105,15 +57,8 @@ class QoSPlugin(qos.QoSPluginBase):
 
     def __init__(self):
         super(QoSPlugin, self).__init__()
-        self.register_resource_providers()
-
-    def register_resource_providers(self):
-        rpc_registry.register_provider(
-            _get_qos_bandwidth_limit_rule_cb_stub,
-            rpc_resources.QOS_RULE)
-
         rpc_registry.register_provider(
-            _get_qos_policy_cb_stub,
+            _get_qos_policy_cb,
             rpc_resources.QOS_POLICY)
 
     def create_policy(self, context, policy):
index f68e02da7ffea5d9530c241c826e93ce6977262b..7e9f58898453706fabc8f3027c483178ae113320 100644 (file)
@@ -44,20 +44,6 @@ class ResourcesCallbackRequestTestCase(base.BaseTestCase):
             }
             return qos_policy
 
-        #TODO(QoS) convert it to the version object format
-        def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
-            bandwidth_limit = {
-                "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
-                "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
-                "max_kbps": "10000",
-                "max_burst_kbps": "0",
-            }
-            return bandwidth_limit
-
-        rpc_registry.register_provider(
-                        _get_qos_bandwidth_limit_rule_cb,
-                        resources.QOS_RULE)
-
         rpc_registry.register_provider(
             _get_qos_policy_cb,
             resources.QOS_POLICY)
@@ -70,9 +56,3 @@ class ResourcesCallbackRequestTestCase(base.BaseTestCase):
             self.resource_id,
             **kwargs)
         self.assertEqual(self.resource_id, qos_policy['id'])
-
-        qos_rule = rpc_registry.get_info(
-            resources.QOS_RULE,
-            self.qos_rule_id,
-            **kwargs)
-        self.assertEqual(self.qos_rule_id, qos_rule['id'])
diff --git a/neutron/tests/unit/api/rpc/callbacks/test_resources.py b/neutron/tests/unit/api/rpc/callbacks/test_resources.py
new file mode 100644 (file)
index 0000000..78d8e5d
--- /dev/null
@@ -0,0 +1,54 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.api.rpc.callbacks import resources
+from neutron.objects.qos import policy
+from neutron.tests import base
+
+
+class GetResourceTypeTestCase(base.BaseTestCase):
+
+    def test_get_resource_type_none(self):
+        self.assertIsNone(resources.get_resource_type(None))
+
+    def test_get_resource_type_wrong_type(self):
+        self.assertIsNone(resources.get_resource_type(object()))
+
+    def test_get_resource_type(self):
+        # we could use any other registered NeutronObject type here
+        self.assertEqual(policy.QosPolicy.obj_name(),
+                         resources.get_resource_type(policy.QosPolicy()))
+
+
+class IsValidResourceTypeTestCase(base.BaseTestCase):
+
+    def test_known_type(self):
+        # it could be any other NeutronObject, assuming it's known to RPC
+        # callbacks
+        self.assertTrue(resources.is_valid_resource_type(
+            policy.QosPolicy.obj_name()))
+
+    def test_unknown_type(self):
+        self.assertFalse(
+            resources.is_valid_resource_type('unknown-resource-type'))
+
+
+class GetResourceClsTestCase(base.BaseTestCase):
+
+    def test_known_type(self):
+        # it could be any other NeutronObject, assuming it's known to RPC
+        # callbacks
+        self.assertEqual(policy.QosPolicy,
+                         resources.get_resource_cls(resources.QOS_POLICY))
+
+    def test_unknown_type(self):
+        self.assertIsNone(resources.get_resource_cls('unknown-resource-type'))
diff --git a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py
new file mode 100755 (executable)
index 0000000..347c2a3
--- /dev/null
@@ -0,0 +1,101 @@
+# Copyright (c) 2015 Mellanox Technologies, Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+from oslo_utils import uuidutils
+
+from neutron.api.rpc.callbacks import resources
+from neutron.api.rpc.handlers import resources_rpc
+from neutron import context
+from neutron.objects.qos import policy
+from neutron.tests import base
+
+
+class ResourcesRpcBaseTestCase(base.BaseTestCase):
+
+    def setUp(self):
+        super(ResourcesRpcBaseTestCase, self).setUp()
+        self.context = context.get_admin_context()
+
+    def _create_test_policy_dict(self):
+        return {'id': uuidutils.generate_uuid(),
+                'tenant_id': uuidutils.generate_uuid(),
+                'name': 'test',
+                'description': 'test',
+                'shared': False}
+
+    def _create_test_policy(self, policy_dict):
+        policy_obj = policy.QosPolicy(self.context, **policy_dict)
+        policy_obj.obj_reset_changes()
+        return policy_obj
+
+
+class ResourcesServerRpcApiTestCase(ResourcesRpcBaseTestCase):
+
+    def setUp(self):
+        super(ResourcesServerRpcApiTestCase, self).setUp()
+        self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client')
+        self.client = self.client_p.start()
+        self.rpc = resources_rpc.ResourcesServerRpcApi()
+        self.mock_cctxt = self.rpc.client.prepare.return_value
+
+    def test_get_info(self):
+        policy_dict = self._create_test_policy_dict()
+        expected_policy_obj = self._create_test_policy(policy_dict)
+        qos_policy_id = policy_dict['id']
+        self.mock_cctxt.call.return_value = (
+            expected_policy_obj.obj_to_primitive())
+        get_info_result = self.rpc.get_info(
+            self.context, resources.QOS_POLICY, qos_policy_id)
+        self.mock_cctxt.call.assert_called_once_with(
+            self.context, 'get_info', resource_type=resources.QOS_POLICY,
+            version=policy.QosPolicy.VERSION, resource_id=qos_policy_id)
+        self.assertEqual(expected_policy_obj, get_info_result)
+
+    def test_get_info_invalid_resource_type_cls(self):
+        self.assertRaises(
+            resources_rpc.InvalidResourceTypeClass, self.rpc.get_info,
+            self.context, 'foo_type', 'foo_id')
+
+    def test_get_info_resource_not_found(self):
+        policy_dict = self._create_test_policy_dict()
+        qos_policy_id = policy_dict['id']
+        self.mock_cctxt.call.return_value = None
+        self.assertRaises(
+            resources_rpc.ResourceNotFound, self.rpc.get_info, self.context,
+            resources.QOS_POLICY, qos_policy_id)
+
+
+class ResourcesServerRpcCallbackTestCase(ResourcesRpcBaseTestCase):
+
+    def setUp(self):
+        super(ResourcesServerRpcCallbackTestCase, self).setUp()
+        self.callbacks = resources_rpc.ResourcesServerRpcCallback()
+
+    def test_get_info(self):
+        policy_dict = self._create_test_policy_dict()
+        policy_obj = self._create_test_policy(policy_dict)
+        qos_policy_id = policy_dict['id']
+        with mock.patch.object(resources_rpc.registry, 'get_info',
+                               return_value=policy_obj) as registry_mock:
+            primitive = self.callbacks.get_info(
+                self.context, resource_type=resources.QOS_POLICY,
+                version=policy.QosPolicy.VERSION,
+                resource_id=qos_policy_id)
+            registry_mock.assert_called_once_with(
+                resources.QOS_POLICY,
+                qos_policy_id, context=self.context)
+        self.assertEqual(policy_dict, primitive['versioned_object.data'])
+        self.assertEqual(policy_obj.obj_to_primitive(), primitive)