]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Handle qos_policy on network/port create/update
authorMike Kolesnik <mkolesni@redhat.com>
Wed, 15 Jul 2015 07:44:15 +0000 (10:44 +0300)
committerIhar Hrachyshka <ihrachys@redhat.com>
Sun, 19 Jul 2015 06:33:04 +0000 (08:33 +0200)
Added handling for qos_policy_id field in the network and port
entities via ML2 extension driver.
The QoS profile will be associated to the network/port when requested as
part of the entity creation or update.

Allow ML2 extension manager to not register for any api extension
(new use case).

===

Extend the resources using the QoS extension class

Since the QoS extension for plugins is handles by this class, it makes
sense for it to handle also property extension of resources.

For ML2 this means that that extend_{network,port}_dict functions will
handle the extension of resources by calling QosExtensionHandler.
This logic can easily be reused by other plugins.

Note: we should make sure that resource extension does not require db
access, otherwise we see DBDeadLock errors and random tempest failures.
To achieve this, we define a new SQLAlchemy joined relationship on
policy bindings to make networks and ports receive those bindings on
their fetch from database. After that, the only work to do left for
resource extension handler is to copy the fetched policy into resource
dictionary.

===

Also enable new qos ml2 extension until we configure it in gate via
project-config and devstack-gate to make sure it's enabled and tested.

Co-Authored-By: Ihar Hrachyshka <ihrachys@redhat.com>
Partially-implements: blueprint quantum-qos-api
Change-Id: I1b7d4611215a471d5c24eb3d7208dcddb7e015f4

neutron/db/qos/models.py
neutron/plugins/ml2/driver_api.py
neutron/plugins/ml2/extensions/qos.py [new file with mode: 0644]
neutron/plugins/ml2/managers.py
neutron/plugins/ml2/plugin.py
neutron/services/qos/qos_extension.py [new file with mode: 0644]
neutron/services/qos/qos_plugin.py
neutron/tests/unit/services/qos/__init__.py [new file with mode: 0644]
neutron/tests/unit/services/qos/test_qos_extension.py [new file with mode: 0644]
setup.cfg

index a34b9367b17c4183b885327bcb03de5313585ee1..bf0a62d011a7b952f84da6a841da864314448b88 100755 (executable)
@@ -44,6 +44,10 @@ class QosNetworkPolicyBinding(model_base.BASEV2):
                            nullable=False,
                            unique=True,
                            primary_key=True)
+    network = sa.orm.relationship(
+        models_v2.Network,
+        backref=sa.orm.backref("qos_policy_binding", uselist=False,
+                               cascade='delete', lazy='joined'))
 
 
 class QosPortPolicyBinding(model_base.BASEV2):
@@ -59,6 +63,10 @@ class QosPortPolicyBinding(model_base.BASEV2):
                         nullable=False,
                         unique=True,
                         primary_key=True)
+    port = sa.orm.relationship(
+        models_v2.Port,
+        backref=sa.orm.backref("qos_policy_binding", uselist=False,
+                               cascade='delete', lazy='joined'))
 
 
 class QosRule(model_base.BASEV2, models_v2.HasId):
index 3284832beebca18670c37a670a5e88ff1c0e366d..c54ab1ba35abadbb95303f8fd6025d27d63e1139 100644 (file)
@@ -911,12 +911,14 @@ class ExtensionDriver(object):
         """
         pass
 
-    @abc.abstractproperty
+    @property
     def extension_alias(self):
         """Supported extension alias.
 
         Return the alias identifying the core API extension supported
-        by this driver.
+        by this driver. Do not declare if API extension handling will
+        be left to a service plugin, and we just need to provide
+        core resource extension and updates.
         """
         pass
 
diff --git a/neutron/plugins/ml2/extensions/qos.py b/neutron/plugins/ml2/extensions/qos.py
new file mode 100644 (file)
index 0000000..a11b232
--- /dev/null
@@ -0,0 +1,50 @@
+# Copyright (c) 2015 Red Hat Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from oslo_log import log as logging
+
+from neutron.plugins.ml2 import driver_api as api
+from neutron.services.qos import qos_extension
+
+LOG = logging.getLogger(__name__)
+
+
+class QosExtensionDriver(api.ExtensionDriver):
+
+    def initialize(self):
+        self.qos_ext_handler = qos_extension.QosResourceExtensionHandler()
+        LOG.debug("QosExtensionDriver initialization complete")
+
+    def process_create_network(self, context, data, result):
+        self.qos_ext_handler.process_resource(
+            context, qos_extension.NETWORK, data, result)
+
+    process_update_network = process_create_network
+
+    def process_create_port(self, context, data, result):
+        self.qos_ext_handler.process_resource(
+            context, qos_extension.PORT, data, result)
+
+    process_update_port = process_create_port
+
+    def extend_network_dict(self, session, db_data, result):
+        result.update(
+            self.qos_ext_handler.extract_resource_fields(qos_extension.NETWORK,
+                                                         db_data))
+
+    def extend_port_dict(self, session, db_data, result):
+        result.update(
+            self.qos_ext_handler.extract_resource_fields(qos_extension.PORT,
+                                                         db_data))
index 1d1d204a0c58529890abf0558b65ba8ef71ab7fa..9f2e4af870a0a2b4408c61afb6221a5e54a23500 100644 (file)
@@ -723,10 +723,14 @@ class ExtensionManager(stevedore.named.NamedExtensionManager):
         # the order in which the drivers are called.
         self.ordered_ext_drivers = []
 
+        #TODO(QoS): enforce qos extension until we enable it in devstack-gate
+        drivers = cfg.CONF.ml2.extension_drivers
+        if 'qos' not in drivers:
+            drivers += ['qos']
         LOG.info(_LI("Configured extension driver names: %s"),
-                 cfg.CONF.ml2.extension_drivers)
+                 drivers)
         super(ExtensionManager, self).__init__('neutron.ml2.extension_drivers',
-                                               cfg.CONF.ml2.extension_drivers,
+                                               drivers,
                                                invoke_on_load=True,
                                                name_order=True)
         LOG.info(_LI("Loaded extension driver names: %s"), self.names())
@@ -753,9 +757,10 @@ class ExtensionManager(stevedore.named.NamedExtensionManager):
         exts = []
         for driver in self.ordered_ext_drivers:
             alias = driver.obj.extension_alias
-            exts.append(alias)
-            LOG.info(_LI("Got %(alias)s extension from driver '%(drv)s'"),
-                     {'alias': alias, 'drv': driver.name})
+            if alias:
+                exts.append(alias)
+                LOG.info(_LI("Got %(alias)s extension from driver '%(drv)s'"),
+                         {'alias': alias, 'drv': driver.name})
         return exts
 
     def _call_on_ext_drivers(self, method_name, plugin_context, data, result):
index 9fa6eb3f9a2e8802c6cafd146f7436b9ac9ebf06..3c92d9820d9b6bc666a4c23b0c95f0f6922ed781 100644 (file)
@@ -64,6 +64,7 @@ from neutron.extensions import extra_dhcp_opt as edo_ext
 from neutron.extensions import portbindings
 from neutron.extensions import portsecurity as psec
 from neutron.extensions import providernet as provider
+from neutron.extensions import qos
 from neutron.extensions import vlantransparent
 from neutron.i18n import _LE, _LI, _LW
 from neutron import manager
@@ -1140,6 +1141,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                         original_port[psec.PORTSECURITY] !=
                         updated_port[psec.PORTSECURITY]):
                 need_port_update_notify = True
+            # TODO(QoS): Move out to the extension framework somehow.
+            # Follow https://review.openstack.org/#/c/169223 for a solution.
+            if (qos.QOS_POLICY_ID in attrs and
+                    original_port[qos.QOS_POLICY_ID] !=
+                    updated_port[qos.QOS_POLICY_ID]):
+                need_port_update_notify = True
 
             if addr_pair.ADDRESS_PAIRS in attrs:
                 need_port_update_notify |= (
diff --git a/neutron/services/qos/qos_extension.py b/neutron/services/qos/qos_extension.py
new file mode 100644 (file)
index 0000000..2cae032
--- /dev/null
@@ -0,0 +1,82 @@
+# Copyright (c) 2015 Red Hat Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.extensions import qos
+from neutron import manager
+from neutron.objects.qos import policy as policy_object
+from neutron.plugins.common import constants as plugin_constants
+
+NETWORK = 'network'
+PORT = 'port'
+
+
+# TODO(QoS): Add interface to define how this should look like
+class QosResourceExtensionHandler(object):
+
+    @property
+    def plugin_loaded(self):
+        if not hasattr(self, '_plugin_loaded'):
+            service_plugins = manager.NeutronManager.get_service_plugins()
+            self._plugin_loaded = plugin_constants.QOS in service_plugins
+        return self._plugin_loaded
+
+    def _get_policy_obj(self, context, policy_id):
+        return policy_object.QosPolicy.get_by_id(context, policy_id)
+
+    def _update_port_policy(self, context, port, port_changes):
+        old_policy = policy_object.QosPolicy.get_port_policy(
+            context, port['id'])
+        if old_policy:
+            #TODO(QoS): this means two transactions. One for detaching
+            #           one for re-attaching, we may want to update
+            #           within a single transaction instead, or put
+            #           a whole transaction on top, or handle the switch
+            #           at db api level automatically within transaction.
+            old_policy.detach_port(port['id'])
+
+        qos_policy_id = port_changes.get(qos.QOS_POLICY_ID)
+        if qos_policy_id is not None:
+            policy = self._get_policy_obj(context, qos_policy_id)
+            policy.attach_port(port['id'])
+            port[qos.QOS_POLICY_ID] = qos_policy_id
+
+    def _update_network_policy(self, context, network, network_changes):
+        old_policy = policy_object.QosPolicy.get_network_policy(
+            context, network['id'])
+        if old_policy:
+            old_policy.detach_network(network['id'])
+
+        qos_policy_id = network_changes.get(qos.QOS_POLICY_ID)
+        if qos_policy_id:
+            policy = self._get_policy_obj(context, qos_policy_id)
+            policy.attach_network(network['id'])
+            network[qos.QOS_POLICY_ID] = qos_policy_id
+
+    def _exec(self, method_name, context, kwargs):
+        return getattr(self, method_name)(context=context, **kwargs)
+
+    def process_resource(self, context, resource_type, requested_resource,
+                         actual_resource):
+        if qos.QOS_POLICY_ID in requested_resource and self.plugin_loaded:
+            self._exec('_update_%s_policy' % resource_type, context,
+                       {resource_type: actual_resource,
+                        "%s_changes" % resource_type: requested_resource})
+
+    def extract_resource_fields(self, resource_type, resource):
+        if not self.plugin_loaded:
+            return {}
+
+        binding = resource['qos_policy_binding']
+        return {qos.QOS_POLICY_ID: binding['policy_id'] if binding else None}
index 6ef13ae62f51ddcea7318ecc695110297ebc9038..2184d8a1702d1881bc5dafd0c40a6449b03f6d47 100644 (file)
@@ -17,9 +17,6 @@ from neutron import manager
 
 from neutron.api.rpc.callbacks import registry as rpc_registry
 from neutron.api.rpc.callbacks import resources as rpc_resources
-from neutron.callbacks import events
-from neutron.callbacks import registry
-from neutron.callbacks import resources
 from neutron.extensions import qos
 from neutron.i18n import _LW
 from neutron.objects.qos import policy as policy_object
@@ -108,8 +105,6 @@ class QoSPlugin(qos.QoSPluginBase):
     def __init__(self):
         super(QoSPlugin, self).__init__()
         self.register_resource_providers()
-        self.register_port_callbacks()
-        self.register_net_callbacks()
 
     def register_resource_providers(self):
         rpc_registry.register_provider(
@@ -120,55 +115,6 @@ class QoSPlugin(qos.QoSPluginBase):
             _get_qos_policy_cb_stub,
             rpc_resources.QOS_POLICY)
 
-    def register_port_callbacks(self):
-        registry.subscribe(
-            self._extend_port_policy_data, resources.PORT, events.AFTER_READ)
-
-    def _extend_port_policy_data(self, resource, event, trigger, **kwargs):
-        context = kwargs['context']
-        port = kwargs['port']
-        policy = policy_object.QosPolicy.get_port_policy(context, port['id'])
-        port['qos_policy_id'] = policy.id if policy else None
-
-    def update_port_policy(self, context, port):
-        old_policy = policy_object.QosPolicy.get_port_policy(
-            context, port['id'])
-        if old_policy is not None:
-            #TODO(QoS): this means two transactions. One for detaching
-            #           one for re-attaching, we may want to update
-            #           within a single transaction instead, or put
-            #           a whole transaction on top, or handle the switch
-            #           at db api level automatically within transaction.
-            old_policy.detach_port(port['id'])
-
-        qos_policy_id = port.get('qos_policy_id')
-        if qos_policy_id is not None:
-            policy = self._get_policy_obj(context, qos_policy_id)
-            policy.attach_port(port['id'])
-
-    def register_net_callbacks(self):
-        registry.subscribe(self._extend_network_policy_data,
-                           resources.NETWORK,
-                           events.AFTER_READ)
-
-    def _extend_network_policy_data(self, resource, event, trigger, **kwargs):
-        context = kwargs['context']
-        network = kwargs['network']
-        policy = policy_object.QosPolicy.get_network_policy(
-            context, network['id'])
-        network['qos_policy_id'] = policy.id if policy else None
-
-    def update_network_policy(self, context, network):
-        old_policy = policy_object.QosPolicy.get_network_policy(
-            context, network['id'])
-        if old_policy:
-            old_policy.detach_network(network['id'])
-
-        qos_policy_id = network.get('qos_policy_id')
-        if qos_policy_id:
-            policy = self._get_policy_obj(context, qos_policy_id)
-            policy.attach_network(network['id'])
-
     def create_policy(self, context, policy):
         policy = policy_object.QosPolicy(context, **policy['policy'])
         policy.create()
diff --git a/neutron/tests/unit/services/qos/__init__.py b/neutron/tests/unit/services/qos/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/tests/unit/services/qos/test_qos_extension.py b/neutron/tests/unit/services/qos/test_qos_extension.py
new file mode 100644 (file)
index 0000000..3113506
--- /dev/null
@@ -0,0 +1,148 @@
+# Copyright (c) 2015 Red Hat Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+
+from neutron.extensions import qos
+from neutron.plugins.common import constants as plugin_constants
+from neutron.services.qos import qos_extension
+from neutron.tests import base
+
+
+def _get_test_dbdata(qos_policy_id):
+    return {'id': None, 'qos_policy_binding': {'policy_id': qos_policy_id,
+                                               'network_id': 'fake_net_id'}}
+
+
+class QosResourceExtensionHandlerTestCase(base.BaseTestCase):
+
+    def setUp(self):
+        super(QosResourceExtensionHandlerTestCase, self).setUp()
+        self.ext_handler = qos_extension.QosResourceExtensionHandler()
+        policy_p = mock.patch('neutron.objects.qos.policy.QosPolicy')
+        self.policy_m = policy_p.start()
+
+    def test_process_resource_no_qos_policy_id(self):
+        self.ext_handler.process_resource(None, qos_extension.PORT, {}, None)
+        self.assertFalse(self.policy_m.called)
+
+    def _mock_plugin_loaded(self, plugin_loaded):
+        plugins = {}
+        if plugin_loaded:
+            plugins[plugin_constants.QOS] = None
+        return mock.patch('neutron.manager.NeutronManager.get_service_plugins',
+                          return_value=plugins)
+
+    def test_process_resource_no_qos_plugin_loaded(self):
+        with self._mock_plugin_loaded(False):
+            self.ext_handler.process_resource(None, qos_extension.PORT,
+                                              {qos.QOS_POLICY_ID: None}, None)
+            self.assertFalse(self.policy_m.called)
+
+    def test_process_resource_port_new_policy(self):
+        with self._mock_plugin_loaded(True):
+            qos_policy_id = mock.Mock()
+            actual_port = {'id': mock.Mock(),
+                           qos.QOS_POLICY_ID: qos_policy_id}
+            qos_policy = mock.MagicMock()
+            self.policy_m.get_by_id = mock.Mock(return_value=qos_policy)
+            self.ext_handler.process_resource(
+                None, qos_extension.PORT, {qos.QOS_POLICY_ID: qos_policy_id},
+                actual_port)
+
+            qos_policy.attach_port.assert_called_once_with(actual_port['id'])
+
+    def test_process_resource_port_updated_policy(self):
+        with self._mock_plugin_loaded(True):
+            qos_policy_id = mock.Mock()
+            port_id = mock.Mock()
+            actual_port = {'id': port_id,
+                           qos.QOS_POLICY_ID: qos_policy_id}
+            old_qos_policy = mock.MagicMock()
+            self.policy_m.get_port_policy = mock.Mock(
+                return_value=old_qos_policy)
+            new_qos_policy = mock.MagicMock()
+            self.policy_m.get_by_id = mock.Mock(return_value=new_qos_policy)
+            self.ext_handler.process_resource(
+                None, qos_extension.PORT, {qos.QOS_POLICY_ID: qos_policy_id},
+                actual_port)
+
+            old_qos_policy.detach_port.assert_called_once_with(port_id)
+            new_qos_policy.attach_port.assert_called_once_with(port_id)
+
+    def test_process_resource_network_new_policy(self):
+        with self._mock_plugin_loaded(True):
+            qos_policy_id = mock.Mock()
+            actual_network = {'id': mock.Mock(),
+                              qos.QOS_POLICY_ID: qos_policy_id}
+            qos_policy = mock.MagicMock()
+            self.policy_m.get_by_id = mock.Mock(return_value=qos_policy)
+            self.ext_handler.process_resource(
+                None, qos_extension.NETWORK,
+                {qos.QOS_POLICY_ID: qos_policy_id}, actual_network)
+
+            qos_policy.attach_network.assert_called_once_with(
+                actual_network['id'])
+
+    def test_process_resource_network_updated_policy(self):
+        with self._mock_plugin_loaded(True):
+            qos_policy_id = mock.Mock()
+            network_id = mock.Mock()
+            actual_network = {'id': network_id,
+                              qos.QOS_POLICY_ID: qos_policy_id}
+            old_qos_policy = mock.MagicMock()
+            self.policy_m.get_network_policy = mock.Mock(
+                return_value=old_qos_policy)
+            new_qos_policy = mock.MagicMock()
+            self.policy_m.get_by_id = mock.Mock(return_value=new_qos_policy)
+            self.ext_handler.process_resource(
+                None, qos_extension.NETWORK,
+                {qos.QOS_POLICY_ID: qos_policy_id}, actual_network)
+
+            old_qos_policy.detach_network.assert_called_once_with(network_id)
+            new_qos_policy.attach_network.assert_called_once_with(network_id)
+
+    def test_extract_resource_fields_plugin_not_loaded(self):
+        with self._mock_plugin_loaded(False):
+            fields = self.ext_handler.extract_resource_fields(None, None)
+            self.assertEqual({}, fields)
+
+    def _test_extract_resource_fields_for_port(self, qos_policy_id):
+        with self._mock_plugin_loaded(True):
+            fields = self.ext_handler.extract_resource_fields(
+                qos_extension.PORT, _get_test_dbdata(qos_policy_id))
+            self.assertEqual({qos.QOS_POLICY_ID: qos_policy_id}, fields)
+
+    def test_extract_resource_fields_no_port_policy(self):
+        self._test_extract_resource_fields_for_port(None)
+
+    def test_extract_resource_fields_port_policy_exists(self):
+        qos_policy_id = mock.Mock()
+        self._test_extract_resource_fields_for_port(qos_policy_id)
+
+    def _test_extract_resource_fields_for_network(self, qos_policy_id):
+        with self._mock_plugin_loaded(True):
+            fields = self.ext_handler.extract_resource_fields(
+                qos_extension.NETWORK, _get_test_dbdata(qos_policy_id))
+            self.assertEqual({qos.QOS_POLICY_ID: qos_policy_id}, fields)
+
+    def test_extract_resource_fields_no_network_policy(self):
+        self._test_extract_resource_fields_for_network(None)
+
+    def test_extract_resource_fields_network_policy_exists(self):
+        qos_policy_id = mock.Mock()
+        qos_policy = mock.Mock()
+        qos_policy.id = qos_policy_id
+        self._test_extract_resource_fields_for_network(qos_policy_id)
index 8cfc58fa3c4f196bd3fc8d56c7c6eebcdc3f25f4..f6d873f0e4413f9354e5b799888f41616561c280 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -197,6 +197,7 @@ neutron.ml2.extension_drivers =
     testdb = neutron.tests.unit.plugins.ml2.drivers.ext_test:TestDBExtensionDriver
     port_security = neutron.plugins.ml2.extensions.port_security:PortSecurityExtensionDriver
     cisco_n1kv_ext = neutron.plugins.ml2.drivers.cisco.n1kv.n1kv_ext_driver:CiscoN1kvExtensionDriver
+    qos = neutron.plugins.ml2.extensions.qos:QosExtensionDriver
 neutron.openstack.common.cache.backends =
     memory = neutron.openstack.common.cache._backends.memory:MemoryBackend
 neutron.ipam_drivers =