Enforce appropriate type for the object returned by rpc callback.
Partially-Implements: blueprint quantum-qos-api
Change-Id: I994253ac15320254104862d2df8dacfc7fc00014
# under the License.
from neutron.api.rpc.callbacks import resource_manager
+from neutron.api.rpc.callbacks import resources
+from neutron.common import exceptions
+
# TODO(ajo): consider adding locking
CALLBACK_MANAGER = None
return CALLBACK_MANAGER
+class CallbackReturnedWrongObjectType(exceptions.NeutronException):
+ message = _('Callback for %(resource_type)s returned wrong object type')
+
+
#resource implementation callback registration functions
def get_info(resource_type, resource_id, **kwargs):
"""Get information about resource type with resource id.
"""
callback = _get_resources_callback_manager().get_callback(resource_type)
if callback:
- return callback(resource_type, resource_id, **kwargs)
+ obj = callback(resource_type, resource_id, **kwargs)
+ if obj:
+ expected_cls = resources.get_resource_cls(resource_type)
+ if not isinstance(obj, expected_cls):
+ raise CallbackReturnedWrongObjectType(
+ resource_type=resource_type)
+ return obj
def register_provider(callback, resource_type):
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.api.rpc.callbacks import registry
+from neutron.api.rpc.callbacks import resource_manager
+from neutron.api.rpc.callbacks import resources
+from neutron.objects.qos import policy
+from neutron.tests import base
+
+
+class GetInfoTestCase(base.BaseTestCase):
+ def setUp(self):
+ super(GetInfoTestCase, self).setUp()
+ mgr = resource_manager.ResourcesCallbacksManager()
+ mgr_p = mock.patch.object(
+ registry, '_get_resources_callback_manager', return_value=mgr)
+ mgr_p.start()
+
+ def test_returns_callback_result(self):
+ policy_obj = policy.QosPolicy(context=None)
+
+ def _fake_policy_cb(*args, **kwargs):
+ return policy_obj
+
+ registry.register_provider(_fake_policy_cb, resources.QOS_POLICY)
+
+ self.assertEqual(policy_obj,
+ registry.get_info(resources.QOS_POLICY, 'fake_id'))
+
+ def test_does_not_raise_on_none(self):
+ def _wrong_type_cb(*args, **kwargs):
+ pass
+
+ registry.register_provider(_wrong_type_cb, resources.QOS_POLICY)
+
+ obj = registry.get_info(resources.QOS_POLICY, 'fake_id')
+ self.assertIsNone(obj)
+
+ def test_raises_on_wrong_object_type(self):
+ def _wrong_type_cb(*args, **kwargs):
+ return object()
+
+ registry.register_provider(_wrong_type_cb, resources.QOS_POLICY)
+
+ self.assertRaises(
+ registry.CallbackReturnedWrongObjectType,
+ registry.get_info, resources.QOS_POLICY, 'fake_id')
from neutron.api.rpc.callbacks import registry as rpc_registry
from neutron.api.rpc.callbacks import resources
+from neutron.objects.qos import policy
+from neutron.objects.qos import rule
from neutron.tests import base
def test_resource_callback_request(self):
- #TODO(QoS) convert it to the version object format
def _get_qos_policy_cb(resource, policy_id, **kwargs):
- qos_policy = {
- "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
- "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
- "name": "10Mbit",
- "description": "This policy limits the ports to 10Mbit max.",
- "shared": False,
- "rules": [{
- "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
- "max_kbps": "10000",
- "max_burst_kbps": "0",
- "type": "bnadwidth_limit"
- }]
- }
+ context = kwargs.get('context')
+ qos_policy = policy.QosPolicy(context,
+ tenant_id="8d4c70a21fed4aeba121a1a429ba0d04",
+ id="46ebaec0-0570-43ac-82f6-60d2b03168c4",
+ name="10Mbit",
+ description="This policy limits the ports to 10Mbit max.",
+ shared=False,
+ rules=[
+ rule.QosBandwidthLimitRule(context,
+ id="5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+ max_kbps=10000,
+ max_burst_kbps=0)
+ ]
+ )
+ qos_policy.obj_reset_changes()
return qos_policy
rpc_registry.register_provider(