#TODO(QoS): handle updates when implemented
# we have two options:
# 1. to add new api for subscribe
- # registry.subscribe(self._process_rules_updates,
- # resources.QOS_RULES, qos_policy_id)
+ # registry.subscribe(self._process_policy_updates,
+ # resources.QOS_POLICY, qos_policy_id)
# 2. combine get_info rpc to also subscribe to the resource
- qos_rules = self.resource_rpc.get_info(
- context, resources.QOS_POLICY, qos_policy_id)
- self._process_rules_updates(
+ qos_policy = self.resource_rpc.get_info(
+ context,
+ resources.QOS_POLICY,
+ qos_policy_id)
+ self._process_policy_updates(
port, resources.QOS_POLICY, qos_policy_id,
- qos_rules, 'create')
+ qos_policy, 'create')
- def _process_rules_updates(
+ def _process_policy_updates(
self, port, resource_type, resource_id,
- qos_rules, action_type):
- getattr(self.qos_driver, action_type)(port, qos_rules)
+ qos_policy, action_type):
+ getattr(self.qos_driver, action_type)(port, qos_policy)
def get_info(resource_type, resource_id, **kwargs):
"""Get information about resource type with resource id.
- The function will check the providers for an specific remotable
+ The function will check the providers for a specific remotable
resource and get the resource.
- :returns: an oslo versioned object.
+ :returns: NeutronObject
"""
callback = _get_resources_callback_manager().get_callback(resource_type)
if callback:
def __init__(self):
self.clear()
- def register(self, callback, resource):
- """register callback for a resource .
+ def register(self, callback, resource_type):
+ """Register a callback for a resource type.
- One callback can be register to a resource
+ Only one callback can be registered for a resource type.
- :param callback: the callback. It must raise or return a dict.
- :param resource: the resource. It must be a valid resource.
+ :param callback: the callback. It must raise or return NeutronObject.
+ :param resource_type: must be a valid resource type.
"""
- LOG.debug("register: %(callback)s %(resource)s",
- {'callback': callback, 'resource': resource})
- if resource not in resources.VALID:
- raise exceptions.Invalid(element='resource', value=resource)
+ LOG.debug("register: %(callback)s %(resource_type)s",
+ {'callback': callback, 'resource_type': resource_type})
+ if not resources.is_valid_resource_type(resource_type):
+ raise exceptions.Invalid(element='resource', value=resource_type)
- self._callbacks[resource] = callback
+ self._callbacks[resource_type] = callback
- def unregister(self, resource):
+ def unregister(self, resource_type):
"""Unregister callback from the registry.
- :param callback: the callback.
- :param resource: the resource.
+ :param resource: must be a valid resource type.
"""
- LOG.debug("Unregister: %(resource)s",
- {'resource': resource})
- if resource not in resources.VALID:
- raise exceptions.Invalid(element='resource', value=resource)
- self._callbacks[resource] = None
+ LOG.debug("Unregister: %s", resource_type)
+ if not resources.is_valid_resource_type(resource_type):
+ raise exceptions.Invalid(element='resource', value=resource_type)
+ self._callbacks[resource_type] = None
def clear(self):
- """Brings the manager to a clean slate."""
+ """Brings the manager to a clean state."""
self._callbacks = collections.defaultdict(dict)
- def get_callback(self, resource):
+ def get_callback(self, resource_type):
"""Return the callback if found, None otherwise.
- :param resource: the resource. It must be a valid resource.
+ :param resource_type: must be a valid resource type.
"""
- if resource not in resources.VALID:
- raise exceptions.Invalid(element='resource', value=resource)
+ if not resources.is_valid_resource_type(resource_type):
+ raise exceptions.Invalid(element='resource', value=resource_type)
- return self._callbacks[resource]
+ return self._callbacks[resource_type]
# License for the specific language governing permissions and limitations
# under the License.
-QOS_POLICY = 'qos-policy'
-QOS_RULE = 'qos-rule'
+from neutron.objects.qos import policy
-VALID = (
- QOS_POLICY,
- QOS_RULE,
+
+_QOS_POLICY_CLS = policy.QosPolicy
+
+_VALID_CLS = (
+ _QOS_POLICY_CLS,
)
+
+_VALID_TYPES = [cls.obj_name() for cls in _VALID_CLS]
+
+
+# Supported types
+QOS_POLICY = _QOS_POLICY_CLS.obj_name()
+
+
+_TYPE_TO_CLS_MAP = {
+ QOS_POLICY: _QOS_POLICY_CLS,
+}
+
+
+def get_resource_type(resource_cls):
+ if not resource_cls:
+ return None
+
+ if not hasattr(resource_cls, 'obj_name'):
+ return None
+
+ return resource_cls.obj_name()
+
+
+def is_valid_resource_type(resource_type):
+ return resource_type in _VALID_TYPES
+
+
+def get_resource_cls(resource_type):
+ return _TYPE_TO_CLS_MAP.get(resource_type)
import oslo_messaging
from neutron.api.rpc.callbacks import registry
+from neutron.api.rpc.callbacks import resources
from neutron.common import constants
+from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
LOG = logging.getLogger(__name__)
+class ResourcesRpcError(exceptions.NeutronException):
+ pass
+
+
+class InvalidResourceTypeClass(ResourcesRpcError):
+ message = _("Invalid resource type %(resource_type)s")
+
+
+class ResourceNotFound(ResourcesRpcError):
+ message = _("Resource %(resource_id)s of type %(resource_type)s "
+ "not found")
+
+
+def _validate_resource_type(resource_type):
+ if not resources.is_valid_resource_type(resource_type):
+ raise InvalidResourceTypeClass(resource_type=resource_type)
+
+
class ResourcesServerRpcApi(object):
"""Agent-side RPC (stub) for agent-to-plugin interaction.
This class implements the client side of an rpc interface. The server side
can be found below: ResourcesServerRpcCallback. For more information on
- changing rpc interfaces, see doc/source/devref/rpc_api.rst.
+ this RPC interface, see doc/source/devref/rpc_callbacks.rst.
"""
def __init__(self):
@log_helpers.log_method_call
def get_info(self, context, resource_type, resource_id):
+ _validate_resource_type(resource_type)
+
+ # we've already validated the resource type, so we are pretty sure the
+ # class is there => no need to validate it specifically
+ resource_type_cls = resources.get_resource_cls(resource_type)
+
cctxt = self.client.prepare()
- #TODO(Qos): add deserialize version object
- return cctxt.call(context, 'get_info',
- resource_type=resource_type, resource_id=resource_id)
+ primitive = cctxt.call(context, 'get_info',
+ resource_type=resource_type,
+ version=resource_type_cls.VERSION, resource_id=resource_id)
+
+ if primitive is None:
+ raise ResourceNotFound(resource_type=resource_type,
+ resource_id=resource_id)
+
+ obj = resource_type_cls.obj_from_primitive(primitive)
+ obj.obj_reset_changes()
+ return obj
class ResourcesServerRpcCallback(object):
This class implements the server side of an rpc interface. The client side
can be found above: ResourcesServerRpcApi. For more information on
- changing rpc interfaces, see doc/source/devref/rpc_api.rst.
+ this RPC interface, see doc/source/devref/rpc_callbacks.rst.
"""
# History
target = oslo_messaging.Target(
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
- def get_info(self, context, resource_type, resource_id):
- kwargs = {'context': context}
- #TODO(Qos): add serialize version object
- return registry.get_info(
+ def get_info(self, context, resource_type, version, resource_id):
+ _validate_resource_type(resource_type)
+
+ obj = registry.get_info(
resource_type,
resource_id,
- **kwargs)
+ context=context)
+
+ if obj:
+ return obj.obj_to_primitive(target_version=version)
LOG = logging.getLogger(__name__)
-#TODO(QoS): remove this stub when db is ready
-def _get_qos_policy_cb_stub(resource, policy_id, **kwargs):
- """Hardcoded stub for testing until we get the db working."""
- qos_policy = {
- "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
- "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
- "name": "10Mbit",
- "description": "This policy limits the ports to 10Mbit max.",
- "shared": False,
- "rules": [{
- "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
- "max_kbps": "10000",
- "max_burst_kbps": "0",
- "type": "bandwidth_limit"
- }]
- }
- return qos_policy
-
-
-def _get_qos_policy_cb(resource, policy_id, **kwargs):
+def _get_qos_policy_cb(resource_type, policy_id, **kwargs):
qos_plugin = manager.NeutronManager.get_service_plugins().get(
constants.QOS)
context = kwargs.get('context')
if context is None:
LOG.warning(_LW(
- 'Received %(resource)s %(policy_id)s without context'),
- {'resource': resource, 'policy_id': policy_id}
+ 'Received %(resource_type)s %(policy_id)s without context'),
+ {'resource_type': resource_type, 'policy_id': policy_id}
)
return
return qos_policy
-#TODO(QoS): remove this stub when db is ready
-def _get_qos_bandwidth_limit_rule_cb_stub(resource, rule_id, **kwargs):
- """Hardcoded for testing until we get the db working."""
- bandwidth_limit = {
- "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
- "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
- "max_kbps": "10000",
- "max_burst_kbps": "0",
- }
- return bandwidth_limit
-
-
-def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
- qos_plugin = manager.NeutronManager.get_service_plugins().get(
- constants.QOS)
- context = kwargs.get('context')
- if context is None:
- LOG.warning(_LW(
- 'Received %(resource)s %(rule_id,)s without context '),
- {'resource': resource, 'rule_id,': rule_id}
- )
- return
-
- bandwidth_limit = qos_plugin.get_qos_bandwidth_limit_rule(
- context,
- rule_id)
- return bandwidth_limit
-
-
class QoSPlugin(qos.QoSPluginBase):
"""Implementation of the Neutron QoS Service Plugin.
def __init__(self):
super(QoSPlugin, self).__init__()
- self.register_resource_providers()
-
- def register_resource_providers(self):
- rpc_registry.register_provider(
- _get_qos_bandwidth_limit_rule_cb_stub,
- rpc_resources.QOS_RULE)
-
rpc_registry.register_provider(
- _get_qos_policy_cb_stub,
+ _get_qos_policy_cb,
rpc_resources.QOS_POLICY)
def create_policy(self, context, policy):
}
return qos_policy
- #TODO(QoS) convert it to the version object format
- def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
- bandwidth_limit = {
- "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
- "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
- "max_kbps": "10000",
- "max_burst_kbps": "0",
- }
- return bandwidth_limit
-
- rpc_registry.register_provider(
- _get_qos_bandwidth_limit_rule_cb,
- resources.QOS_RULE)
-
rpc_registry.register_provider(
_get_qos_policy_cb,
resources.QOS_POLICY)
self.resource_id,
**kwargs)
self.assertEqual(self.resource_id, qos_policy['id'])
-
- qos_rule = rpc_registry.get_info(
- resources.QOS_RULE,
- self.qos_rule_id,
- **kwargs)
- self.assertEqual(self.qos_rule_id, qos_rule['id'])
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.api.rpc.callbacks import resources
+from neutron.objects.qos import policy
+from neutron.tests import base
+
+
+class GetResourceTypeTestCase(base.BaseTestCase):
+
+ def test_get_resource_type_none(self):
+ self.assertIsNone(resources.get_resource_type(None))
+
+ def test_get_resource_type_wrong_type(self):
+ self.assertIsNone(resources.get_resource_type(object()))
+
+ def test_get_resource_type(self):
+ # we could use any other registered NeutronObject type here
+ self.assertEqual(policy.QosPolicy.obj_name(),
+ resources.get_resource_type(policy.QosPolicy()))
+
+
+class IsValidResourceTypeTestCase(base.BaseTestCase):
+
+ def test_known_type(self):
+ # it could be any other NeutronObject, assuming it's known to RPC
+ # callbacks
+ self.assertTrue(resources.is_valid_resource_type(
+ policy.QosPolicy.obj_name()))
+
+ def test_unknown_type(self):
+ self.assertFalse(
+ resources.is_valid_resource_type('unknown-resource-type'))
+
+
+class GetResourceClsTestCase(base.BaseTestCase):
+
+ def test_known_type(self):
+ # it could be any other NeutronObject, assuming it's known to RPC
+ # callbacks
+ self.assertEqual(policy.QosPolicy,
+ resources.get_resource_cls(resources.QOS_POLICY))
+
+ def test_unknown_type(self):
+ self.assertIsNone(resources.get_resource_cls('unknown-resource-type'))
--- /dev/null
+# Copyright (c) 2015 Mellanox Technologies, Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+from oslo_utils import uuidutils
+
+from neutron.api.rpc.callbacks import resources
+from neutron.api.rpc.handlers import resources_rpc
+from neutron import context
+from neutron.objects.qos import policy
+from neutron.tests import base
+
+
+class ResourcesRpcBaseTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ super(ResourcesRpcBaseTestCase, self).setUp()
+ self.context = context.get_admin_context()
+
+ def _create_test_policy_dict(self):
+ return {'id': uuidutils.generate_uuid(),
+ 'tenant_id': uuidutils.generate_uuid(),
+ 'name': 'test',
+ 'description': 'test',
+ 'shared': False}
+
+ def _create_test_policy(self, policy_dict):
+ policy_obj = policy.QosPolicy(self.context, **policy_dict)
+ policy_obj.obj_reset_changes()
+ return policy_obj
+
+
+class ResourcesServerRpcApiTestCase(ResourcesRpcBaseTestCase):
+
+ def setUp(self):
+ super(ResourcesServerRpcApiTestCase, self).setUp()
+ self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client')
+ self.client = self.client_p.start()
+ self.rpc = resources_rpc.ResourcesServerRpcApi()
+ self.mock_cctxt = self.rpc.client.prepare.return_value
+
+ def test_get_info(self):
+ policy_dict = self._create_test_policy_dict()
+ expected_policy_obj = self._create_test_policy(policy_dict)
+ qos_policy_id = policy_dict['id']
+ self.mock_cctxt.call.return_value = (
+ expected_policy_obj.obj_to_primitive())
+ get_info_result = self.rpc.get_info(
+ self.context, resources.QOS_POLICY, qos_policy_id)
+ self.mock_cctxt.call.assert_called_once_with(
+ self.context, 'get_info', resource_type=resources.QOS_POLICY,
+ version=policy.QosPolicy.VERSION, resource_id=qos_policy_id)
+ self.assertEqual(expected_policy_obj, get_info_result)
+
+ def test_get_info_invalid_resource_type_cls(self):
+ self.assertRaises(
+ resources_rpc.InvalidResourceTypeClass, self.rpc.get_info,
+ self.context, 'foo_type', 'foo_id')
+
+ def test_get_info_resource_not_found(self):
+ policy_dict = self._create_test_policy_dict()
+ qos_policy_id = policy_dict['id']
+ self.mock_cctxt.call.return_value = None
+ self.assertRaises(
+ resources_rpc.ResourceNotFound, self.rpc.get_info, self.context,
+ resources.QOS_POLICY, qos_policy_id)
+
+
+class ResourcesServerRpcCallbackTestCase(ResourcesRpcBaseTestCase):
+
+ def setUp(self):
+ super(ResourcesServerRpcCallbackTestCase, self).setUp()
+ self.callbacks = resources_rpc.ResourcesServerRpcCallback()
+
+ def test_get_info(self):
+ policy_dict = self._create_test_policy_dict()
+ policy_obj = self._create_test_policy(policy_dict)
+ qos_policy_id = policy_dict['id']
+ with mock.patch.object(resources_rpc.registry, 'get_info',
+ return_value=policy_obj) as registry_mock:
+ primitive = self.callbacks.get_info(
+ self.context, resource_type=resources.QOS_POLICY,
+ version=policy.QosPolicy.VERSION,
+ resource_id=qos_policy_id)
+ registry_mock.assert_called_once_with(
+ resources.QOS_POLICY,
+ qos_policy_id, context=self.context)
+ self.assertEqual(policy_dict, primitive['versioned_object.data'])
+ self.assertEqual(policy_obj.obj_to_primitive(), primitive)