]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add pluggable backend driver for QoS Service notification
authorEran Gampel <eran@gampel.net>
Wed, 1 Jul 2015 15:32:30 +0000 (18:32 +0300)
committerIhar Hrachyshka <ihrachys@redhat.com>
Tue, 28 Jul 2015 19:30:04 +0000 (21:30 +0200)
Added a reference driver for the agent based solutions
RPC sending the messages over the message queue

Partially-Implements: blueprint quantum-qos-api
Change-Id: I725c876739ff85b4db8fb053de0362ce367ae78c

neutron/services/qos/notification_drivers/__init__.py [new file with mode: 0644]
neutron/services/qos/notification_drivers/message_queue.py [new file with mode: 0644]
neutron/services/qos/notification_drivers/qos_base.py [new file with mode: 0644]
neutron/services/qos/qos_plugin.py
neutron/tests/unit/services/qos/notification_drivers/__init__.py [new file with mode: 0644]
neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py [new file with mode: 0644]
neutron/tests/unit/services/qos/test_qos_plugin.py [new file with mode: 0644]

diff --git a/neutron/services/qos/notification_drivers/__init__.py b/neutron/services/qos/notification_drivers/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/services/qos/notification_drivers/message_queue.py b/neutron/services/qos/notification_drivers/message_queue.py
new file mode 100644 (file)
index 0000000..2cce274
--- /dev/null
@@ -0,0 +1,70 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from oslo_log import log as logging
+
+from neutron.api.rpc.callbacks import events
+from neutron.api.rpc.callbacks import registry as rpc_registry
+from neutron.api.rpc.callbacks import resources
+from neutron.i18n import _LW
+from neutron.objects.qos import policy as policy_object
+from neutron.services.qos.notification_drivers import qos_base
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _get_qos_policy_cb(resource, policy_id, **kwargs):
+    context = kwargs.get('context')
+    if context is None:
+        LOG.warning(_LW(
+            'Received %(resource)s %(policy_id)s without context'),
+            {'resource': resource, 'policy_id': policy_id}
+        )
+        return
+
+    policy = policy_object.QosPolicy.get_by_id(context, policy_id)
+    return policy
+
+
+class RpcQosServiceNotificationDriver(
+    qos_base.QosServiceNotificationDriverBase):
+    """RPC message queue service notification driver for QoS."""
+
+    def __init__(self):
+        LOG.debug(
+            "Initializing RPC Messaging Queue notification driver for QoS")
+        rpc_registry.register_provider(
+            _get_qos_policy_cb,
+            resources.QOS_POLICY)
+
+    def create_policy(self, policy):
+        #No need to update agents on create
+        pass
+
+    def update_policy(self, policy):
+        # TODO(QoS): this is temporary until we get notify() implemented
+        try:
+            rpc_registry.notify(resources.QOS_POLICY,
+                                events.UPDATED,
+                                policy)
+        except NotImplementedError:
+            pass
+
+    def delete_policy(self, policy):
+        # TODO(QoS): this is temporary until we get notify() implemented
+        try:
+            rpc_registry.notify(resources.QOS_POLICY,
+                                events.DELETED,
+                                policy)
+        except NotImplementedError:
+            pass
diff --git a/neutron/services/qos/notification_drivers/qos_base.py b/neutron/services/qos/notification_drivers/qos_base.py
new file mode 100644 (file)
index 0000000..86d792c
--- /dev/null
@@ -0,0 +1,37 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+import abc
+
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class QosServiceNotificationDriverBase(object):
+    """QoS service notification driver base class."""
+
+    @abc.abstractmethod
+    def create_policy(self, policy):
+        """Create the QoS policy."""
+
+    @abc.abstractmethod
+    def update_policy(self, policy):
+        """Update the QoS policy.
+
+        Apply changes to the QoS policy.
+        """
+
+    @abc.abstractmethod
+    def delete_policy(self, policy):
+        """Delete the QoS policy.
+
+        Remove all rules for this policy and free up all the resources.
+        """
index fb84aa9de1580dc644ef7b50f53a4da92e359671..92d58131b1a134fa01c27d814354441b1b470e30 100644 (file)
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+from oslo_log import log as logging
 
-from neutron import manager
 
-from neutron.api.rpc.callbacks import registry as rpc_registry
-from neutron.api.rpc.callbacks import resources as rpc_resources
 from neutron.db import db_base_plugin_common
 from neutron.extensions import qos
-from neutron.i18n import _LW
 from neutron.objects.qos import policy as policy_object
 from neutron.objects.qos import rule as rule_object
 from neutron.objects.qos import rule_type as rule_type_object
-from neutron.plugins.common import constants
-
-from oslo_log import log as logging
+from neutron.services.qos.notification_drivers import message_queue
 
 
 LOG = logging.getLogger(__name__)
 
 
-def _get_qos_policy_cb(resource_type, policy_id, **kwargs):
-    qos_plugin = manager.NeutronManager.get_service_plugins().get(
-        constants.QOS)
-    context = kwargs.get('context')
-    if context is None:
-        LOG.warning(_LW(
-            'Received %(resource_type)s %(policy_id)s without context'),
-            {'resource_type': resource_type, 'policy_id': policy_id}
-        )
-        return
-
-    qos_policy = qos_plugin.get_qos_policy(context, policy_id)
-    return qos_policy
-
-
 class QoSPlugin(qos.QoSPluginBase):
     """Implementation of the Neutron QoS Service Plugin.
 
@@ -58,29 +38,36 @@ class QoSPlugin(qos.QoSPluginBase):
 
     def __init__(self):
         super(QoSPlugin, self).__init__()
-        rpc_registry.register_provider(
-            _get_qos_policy_cb,
-            rpc_resources.QOS_POLICY)
+        #TODO(QoS) load from configuration option
+        self.notification_driver = (
+            message_queue.RpcQosServiceNotificationDriver())
 
     def create_policy(self, context, policy):
         policy = policy_object.QosPolicy(context, **policy['policy'])
         policy.create()
+        self.notification_driver.create_policy(policy)
         return policy.to_dict()
 
     def update_policy(self, context, policy_id, policy):
         policy = policy_object.QosPolicy(context, **policy['policy'])
         policy.id = policy_id
         policy.update()
+        self.notification_driver.update_policy(policy)
         return policy.to_dict()
 
     def delete_policy(self, context, policy_id):
         policy = policy_object.QosPolicy(context)
         policy.id = policy_id
+        self.notification_driver.delete_policy(policy)
         policy.delete()
 
     def _get_policy_obj(self, context, policy_id):
         return policy_object.QosPolicy.get_by_id(context, policy_id)
 
+    def _update_policy_on_driver(self, context, policy_id):
+        policy = self._get_policy_obj(context, policy_id)
+        self.notification_driver.update_policy(policy)
+
     @db_base_plugin_common.filter_fields
     def get_policy(self, context, policy_id, fields=None):
         return self._get_policy_obj(context, policy_id).to_dict()
@@ -107,6 +94,7 @@ class QoSPlugin(qos.QoSPluginBase):
             context, qos_policy_id=policy_id,
             **bandwidth_limit_rule['bandwidth_limit_rule'])
         rule.create()
+        self._update_policy_on_driver(context, policy_id)
         return rule.to_dict()
 
     def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id,
@@ -115,12 +103,14 @@ class QoSPlugin(qos.QoSPluginBase):
             context, **bandwidth_limit_rule['bandwidth_limit_rule'])
         rule.id = rule_id
         rule.update()
+        self._update_policy_on_driver(context, policy_id)
         return rule.to_dict()
 
     def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
         rule = rule_object.QosBandwidthLimitRule(context)
         rule.id = rule_id
         rule.delete()
+        self._update_policy_on_driver(context, policy_id)
 
     @db_base_plugin_common.filter_fields
     def get_policy_bandwidth_limit_rule(self, context, rule_id,
diff --git a/neutron/tests/unit/services/qos/notification_drivers/__init__.py b/neutron/tests/unit/services/qos/notification_drivers/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py b/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py
new file mode 100644 (file)
index 0000000..a4f163f
--- /dev/null
@@ -0,0 +1,72 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+
+from neutron.api.rpc.callbacks import events
+from neutron.api.rpc.callbacks import resources
+from neutron import context
+from neutron.objects.qos import policy as policy_object
+from neutron.objects.qos import rule as rule_object
+from neutron.services.qos.notification_drivers import message_queue
+from neutron.tests import base
+
+DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+
+
+class TestQosRpcNotificationDriver(base.BaseTestCase):
+
+    def setUp(self):
+        super(TestQosRpcNotificationDriver, self).setUp()
+
+        registry_p = mock.patch(
+                            'neutron.api.rpc.callbacks.registry.notify')
+        self.registry_m = registry_p.start()
+        self.driver = message_queue.RpcQosServiceNotificationDriver()
+
+        self.policy_data = {'policy': {
+                            'id': 7777777,
+                            'tenant_id': 888888,
+                            'name': 'testi-policy',
+                            'description': 'test policyi description',
+                            'shared': True}}
+
+        self.rule_data = {'bandwidth_limit_rule': {
+                            'id': 7777777,
+                            'max_kbps': 100,
+                            'max_burst_kbps': 150}}
+
+        self.policy = policy_object.QosPolicy(context,
+                        **self.policy_data['policy'])
+
+        self.rule = rule_object.QosBandwidthLimitRule(
+                                context,
+                                **self.rule_data['bandwidth_limit_rule'])
+
+    def _validate_registry_params(self, event_type, policy):
+        self.assertTrue(self.registry_m.called, policy)
+        self.registry_m.assert_called_once_with(
+                resources.QOS_POLICY,
+                event_type,
+                policy)
+
+    def test_create_policy(self):
+        self.driver.create_policy(self.policy)
+        self.assertFalse(self.registry_m.called)
+
+    def test_update_policy(self):
+        self.driver.update_policy(self.policy)
+        self._validate_registry_params(events.UPDATED, self.policy)
+
+    def test_delete_policy(self):
+        self.driver.delete_policy(self.policy)
+        self._validate_registry_params(events.DELETED, self.policy)
diff --git a/neutron/tests/unit/services/qos/test_qos_plugin.py b/neutron/tests/unit/services/qos/test_qos_plugin.py
new file mode 100644 (file)
index 0000000..d4927b6
--- /dev/null
@@ -0,0 +1,103 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+from oslo_config import cfg
+
+from neutron.api.rpc.callbacks import events
+from neutron.api.rpc.callbacks import resources
+from neutron import context
+from neutron import manager
+from neutron.objects.qos import policy as policy_object
+from neutron.objects.qos import rule as rule_object
+from neutron.plugins.common import constants
+from neutron.tests import base
+
+
+DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+
+
+class TestQosPlugin(base.BaseTestCase):
+
+    def setUp(self):
+        super(TestQosPlugin, self).setUp()
+        self.setup_coreplugin()
+
+        mock.patch('neutron.db.api.create_object').start()
+        mock.patch('neutron.db.api.update_object').start()
+        mock.patch('neutron.db.api.delete_object').start()
+        mock.patch('neutron.db.api.get_object').start()
+        mock.patch(
+            'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start()
+        self.registry_p = mock.patch(
+            'neutron.api.rpc.callbacks.registry.notify')
+        self.registry_m = self.registry_p.start()
+        cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
+        cfg.CONF.set_override("service_plugins", ["qos"])
+
+        mgr = manager.NeutronManager.get_instance()
+        self.qos_plugin = mgr.get_service_plugins().get(
+            constants.QOS)
+        self.ctxt = context.Context('fake_user', 'fake_tenant')
+        self.policy_data = {
+            'policy': {'id': 7777777,
+                       'tenant_id': 888888,
+                       'name': 'test-policy',
+                       'description': 'Test policy description',
+                       'shared': True}}
+
+        self.rule_data = {
+            'bandwidth_limit_rule': {'id': 7777777,
+                                     'max_kbps': 100,
+                                     'max_burst_kbps': 150}}
+
+        self.policy = policy_object.QosPolicy(
+            context, **self.policy_data['policy'])
+
+        self.rule = rule_object.QosBandwidthLimitRule(
+            context, **self.rule_data['bandwidth_limit_rule'])
+
+    def _validate_registry_params(self, event_type):
+        self.registry_m.assert_called_once_with(
+            resources.QOS_POLICY,
+            event_type,
+            mock.ANY)
+        self.assertIsInstance(
+            self.registry_m.call_args[0][2], policy_object.QosPolicy)
+
+    def test_qos_plugin_add_policy(self):
+        self.qos_plugin.create_policy(self.ctxt, self.policy_data)
+        self.assertFalse(self.registry_m.called)
+
+    def test_qos_plugin_update_policy(self):
+        self.qos_plugin.update_policy(
+            self.ctxt, self.policy.id, self.policy_data)
+        self._validate_registry_params(events.UPDATED)
+
+    def test_qos_plugin_delete_policy(self):
+        self.qos_plugin.delete_policy(self.ctxt, self.policy.id)
+        self._validate_registry_params(events.DELETED)
+
+    def test_qos_plugin_create_policy_rule(self):
+        self.qos_plugin.create_policy_bandwidth_limit_rule(
+            self.ctxt, self.policy.id, self.rule_data)
+        self._validate_registry_params(events.UPDATED)
+
+    def test_qos_plugin_update_policy_rule(self):
+        self.qos_plugin.update_policy_bandwidth_limit_rule(
+            self.ctxt, self.rule.id, self.policy.id, self.rule_data)
+        self._validate_registry_params(events.UPDATED)
+
+    def test_qos_plugin_delete_policy_rule(self):
+        self.qos_plugin.delete_policy_bandwidth_limit_rule(
+            self.ctxt, self.rule.id, self.policy.id)
+        self._validate_registry_params(events.UPDATED)