nullable=False,
unique=True,
primary_key=True)
+ network = sa.orm.relationship(
+ models_v2.Network,
+ backref=sa.orm.backref("qos_policy_binding", uselist=False,
+ cascade='delete', lazy='joined'))
class QosPortPolicyBinding(model_base.BASEV2):
nullable=False,
unique=True,
primary_key=True)
+ port = sa.orm.relationship(
+ models_v2.Port,
+ backref=sa.orm.backref("qos_policy_binding", uselist=False,
+ cascade='delete', lazy='joined'))
class QosRule(model_base.BASEV2, models_v2.HasId):
"""
pass
- @abc.abstractproperty
+ @property
def extension_alias(self):
"""Supported extension alias.
Return the alias identifying the core API extension supported
- by this driver.
+ by this driver. Do not declare if API extension handling will
+ be left to a service plugin, and we just need to provide
+ core resource extension and updates.
"""
pass
--- /dev/null
+# Copyright (c) 2015 Red Hat Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_log import log as logging
+
+from neutron.plugins.ml2 import driver_api as api
+from neutron.services.qos import qos_extension
+
+LOG = logging.getLogger(__name__)
+
+
+class QosExtensionDriver(api.ExtensionDriver):
+
+ def initialize(self):
+ self.qos_ext_handler = qos_extension.QosResourceExtensionHandler()
+ LOG.debug("QosExtensionDriver initialization complete")
+
+ def process_create_network(self, context, data, result):
+ self.qos_ext_handler.process_resource(
+ context, qos_extension.NETWORK, data, result)
+
+ process_update_network = process_create_network
+
+ def process_create_port(self, context, data, result):
+ self.qos_ext_handler.process_resource(
+ context, qos_extension.PORT, data, result)
+
+ process_update_port = process_create_port
+
+ def extend_network_dict(self, session, db_data, result):
+ result.update(
+ self.qos_ext_handler.extract_resource_fields(qos_extension.NETWORK,
+ db_data))
+
+ def extend_port_dict(self, session, db_data, result):
+ result.update(
+ self.qos_ext_handler.extract_resource_fields(qos_extension.PORT,
+ db_data))
# the order in which the drivers are called.
self.ordered_ext_drivers = []
+ #TODO(QoS): enforce qos extension until we enable it in devstack-gate
+ drivers = cfg.CONF.ml2.extension_drivers
+ if 'qos' not in drivers:
+ drivers += ['qos']
LOG.info(_LI("Configured extension driver names: %s"),
- cfg.CONF.ml2.extension_drivers)
+ drivers)
super(ExtensionManager, self).__init__('neutron.ml2.extension_drivers',
- cfg.CONF.ml2.extension_drivers,
+ drivers,
invoke_on_load=True,
name_order=True)
LOG.info(_LI("Loaded extension driver names: %s"), self.names())
exts = []
for driver in self.ordered_ext_drivers:
alias = driver.obj.extension_alias
- exts.append(alias)
- LOG.info(_LI("Got %(alias)s extension from driver '%(drv)s'"),
- {'alias': alias, 'drv': driver.name})
+ if alias:
+ exts.append(alias)
+ LOG.info(_LI("Got %(alias)s extension from driver '%(drv)s'"),
+ {'alias': alias, 'drv': driver.name})
return exts
def _call_on_ext_drivers(self, method_name, plugin_context, data, result):
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
from neutron.extensions import providernet as provider
+from neutron.extensions import qos
from neutron.extensions import vlantransparent
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
original_port[psec.PORTSECURITY] !=
updated_port[psec.PORTSECURITY]):
need_port_update_notify = True
+ # TODO(QoS): Move out to the extension framework somehow.
+ # Follow https://review.openstack.org/#/c/169223 for a solution.
+ if (qos.QOS_POLICY_ID in attrs and
+ original_port[qos.QOS_POLICY_ID] !=
+ updated_port[qos.QOS_POLICY_ID]):
+ need_port_update_notify = True
if addr_pair.ADDRESS_PAIRS in attrs:
need_port_update_notify |= (
--- /dev/null
+# Copyright (c) 2015 Red Hat Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.extensions import qos
+from neutron import manager
+from neutron.objects.qos import policy as policy_object
+from neutron.plugins.common import constants as plugin_constants
+
+NETWORK = 'network'
+PORT = 'port'
+
+
+# TODO(QoS): Add interface to define how this should look like
+class QosResourceExtensionHandler(object):
+
+ @property
+ def plugin_loaded(self):
+ if not hasattr(self, '_plugin_loaded'):
+ service_plugins = manager.NeutronManager.get_service_plugins()
+ self._plugin_loaded = plugin_constants.QOS in service_plugins
+ return self._plugin_loaded
+
+ def _get_policy_obj(self, context, policy_id):
+ return policy_object.QosPolicy.get_by_id(context, policy_id)
+
+ def _update_port_policy(self, context, port, port_changes):
+ old_policy = policy_object.QosPolicy.get_port_policy(
+ context, port['id'])
+ if old_policy:
+ #TODO(QoS): this means two transactions. One for detaching
+ # one for re-attaching, we may want to update
+ # within a single transaction instead, or put
+ # a whole transaction on top, or handle the switch
+ # at db api level automatically within transaction.
+ old_policy.detach_port(port['id'])
+
+ qos_policy_id = port_changes.get(qos.QOS_POLICY_ID)
+ if qos_policy_id is not None:
+ policy = self._get_policy_obj(context, qos_policy_id)
+ policy.attach_port(port['id'])
+ port[qos.QOS_POLICY_ID] = qos_policy_id
+
+ def _update_network_policy(self, context, network, network_changes):
+ old_policy = policy_object.QosPolicy.get_network_policy(
+ context, network['id'])
+ if old_policy:
+ old_policy.detach_network(network['id'])
+
+ qos_policy_id = network_changes.get(qos.QOS_POLICY_ID)
+ if qos_policy_id:
+ policy = self._get_policy_obj(context, qos_policy_id)
+ policy.attach_network(network['id'])
+ network[qos.QOS_POLICY_ID] = qos_policy_id
+
+ def _exec(self, method_name, context, kwargs):
+ return getattr(self, method_name)(context=context, **kwargs)
+
+ def process_resource(self, context, resource_type, requested_resource,
+ actual_resource):
+ if qos.QOS_POLICY_ID in requested_resource and self.plugin_loaded:
+ self._exec('_update_%s_policy' % resource_type, context,
+ {resource_type: actual_resource,
+ "%s_changes" % resource_type: requested_resource})
+
+ def extract_resource_fields(self, resource_type, resource):
+ if not self.plugin_loaded:
+ return {}
+
+ binding = resource['qos_policy_binding']
+ return {qos.QOS_POLICY_ID: binding['policy_id'] if binding else None}
from neutron.api.rpc.callbacks import registry as rpc_registry
from neutron.api.rpc.callbacks import resources as rpc_resources
-from neutron.callbacks import events
-from neutron.callbacks import registry
-from neutron.callbacks import resources
from neutron.extensions import qos
from neutron.i18n import _LW
from neutron.objects.qos import policy as policy_object
def __init__(self):
super(QoSPlugin, self).__init__()
self.register_resource_providers()
- self.register_port_callbacks()
- self.register_net_callbacks()
def register_resource_providers(self):
rpc_registry.register_provider(
_get_qos_policy_cb_stub,
rpc_resources.QOS_POLICY)
- def register_port_callbacks(self):
- registry.subscribe(
- self._extend_port_policy_data, resources.PORT, events.AFTER_READ)
-
- def _extend_port_policy_data(self, resource, event, trigger, **kwargs):
- context = kwargs['context']
- port = kwargs['port']
- policy = policy_object.QosPolicy.get_port_policy(context, port['id'])
- port['qos_policy_id'] = policy.id if policy else None
-
- def update_port_policy(self, context, port):
- old_policy = policy_object.QosPolicy.get_port_policy(
- context, port['id'])
- if old_policy is not None:
- #TODO(QoS): this means two transactions. One for detaching
- # one for re-attaching, we may want to update
- # within a single transaction instead, or put
- # a whole transaction on top, or handle the switch
- # at db api level automatically within transaction.
- old_policy.detach_port(port['id'])
-
- qos_policy_id = port.get('qos_policy_id')
- if qos_policy_id is not None:
- policy = self._get_policy_obj(context, qos_policy_id)
- policy.attach_port(port['id'])
-
- def register_net_callbacks(self):
- registry.subscribe(self._extend_network_policy_data,
- resources.NETWORK,
- events.AFTER_READ)
-
- def _extend_network_policy_data(self, resource, event, trigger, **kwargs):
- context = kwargs['context']
- network = kwargs['network']
- policy = policy_object.QosPolicy.get_network_policy(
- context, network['id'])
- network['qos_policy_id'] = policy.id if policy else None
-
- def update_network_policy(self, context, network):
- old_policy = policy_object.QosPolicy.get_network_policy(
- context, network['id'])
- if old_policy:
- old_policy.detach_network(network['id'])
-
- qos_policy_id = network.get('qos_policy_id')
- if qos_policy_id:
- policy = self._get_policy_obj(context, qos_policy_id)
- policy.attach_network(network['id'])
-
def create_policy(self, context, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.create()
--- /dev/null
+# Copyright (c) 2015 Red Hat Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.extensions import qos
+from neutron.plugins.common import constants as plugin_constants
+from neutron.services.qos import qos_extension
+from neutron.tests import base
+
+
+def _get_test_dbdata(qos_policy_id):
+ return {'id': None, 'qos_policy_binding': {'policy_id': qos_policy_id,
+ 'network_id': 'fake_net_id'}}
+
+
+class QosResourceExtensionHandlerTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ super(QosResourceExtensionHandlerTestCase, self).setUp()
+ self.ext_handler = qos_extension.QosResourceExtensionHandler()
+ policy_p = mock.patch('neutron.objects.qos.policy.QosPolicy')
+ self.policy_m = policy_p.start()
+
+ def test_process_resource_no_qos_policy_id(self):
+ self.ext_handler.process_resource(None, qos_extension.PORT, {}, None)
+ self.assertFalse(self.policy_m.called)
+
+ def _mock_plugin_loaded(self, plugin_loaded):
+ plugins = {}
+ if plugin_loaded:
+ plugins[plugin_constants.QOS] = None
+ return mock.patch('neutron.manager.NeutronManager.get_service_plugins',
+ return_value=plugins)
+
+ def test_process_resource_no_qos_plugin_loaded(self):
+ with self._mock_plugin_loaded(False):
+ self.ext_handler.process_resource(None, qos_extension.PORT,
+ {qos.QOS_POLICY_ID: None}, None)
+ self.assertFalse(self.policy_m.called)
+
+ def test_process_resource_port_new_policy(self):
+ with self._mock_plugin_loaded(True):
+ qos_policy_id = mock.Mock()
+ actual_port = {'id': mock.Mock(),
+ qos.QOS_POLICY_ID: qos_policy_id}
+ qos_policy = mock.MagicMock()
+ self.policy_m.get_by_id = mock.Mock(return_value=qos_policy)
+ self.ext_handler.process_resource(
+ None, qos_extension.PORT, {qos.QOS_POLICY_ID: qos_policy_id},
+ actual_port)
+
+ qos_policy.attach_port.assert_called_once_with(actual_port['id'])
+
+ def test_process_resource_port_updated_policy(self):
+ with self._mock_plugin_loaded(True):
+ qos_policy_id = mock.Mock()
+ port_id = mock.Mock()
+ actual_port = {'id': port_id,
+ qos.QOS_POLICY_ID: qos_policy_id}
+ old_qos_policy = mock.MagicMock()
+ self.policy_m.get_port_policy = mock.Mock(
+ return_value=old_qos_policy)
+ new_qos_policy = mock.MagicMock()
+ self.policy_m.get_by_id = mock.Mock(return_value=new_qos_policy)
+ self.ext_handler.process_resource(
+ None, qos_extension.PORT, {qos.QOS_POLICY_ID: qos_policy_id},
+ actual_port)
+
+ old_qos_policy.detach_port.assert_called_once_with(port_id)
+ new_qos_policy.attach_port.assert_called_once_with(port_id)
+
+ def test_process_resource_network_new_policy(self):
+ with self._mock_plugin_loaded(True):
+ qos_policy_id = mock.Mock()
+ actual_network = {'id': mock.Mock(),
+ qos.QOS_POLICY_ID: qos_policy_id}
+ qos_policy = mock.MagicMock()
+ self.policy_m.get_by_id = mock.Mock(return_value=qos_policy)
+ self.ext_handler.process_resource(
+ None, qos_extension.NETWORK,
+ {qos.QOS_POLICY_ID: qos_policy_id}, actual_network)
+
+ qos_policy.attach_network.assert_called_once_with(
+ actual_network['id'])
+
+ def test_process_resource_network_updated_policy(self):
+ with self._mock_plugin_loaded(True):
+ qos_policy_id = mock.Mock()
+ network_id = mock.Mock()
+ actual_network = {'id': network_id,
+ qos.QOS_POLICY_ID: qos_policy_id}
+ old_qos_policy = mock.MagicMock()
+ self.policy_m.get_network_policy = mock.Mock(
+ return_value=old_qos_policy)
+ new_qos_policy = mock.MagicMock()
+ self.policy_m.get_by_id = mock.Mock(return_value=new_qos_policy)
+ self.ext_handler.process_resource(
+ None, qos_extension.NETWORK,
+ {qos.QOS_POLICY_ID: qos_policy_id}, actual_network)
+
+ old_qos_policy.detach_network.assert_called_once_with(network_id)
+ new_qos_policy.attach_network.assert_called_once_with(network_id)
+
+ def test_extract_resource_fields_plugin_not_loaded(self):
+ with self._mock_plugin_loaded(False):
+ fields = self.ext_handler.extract_resource_fields(None, None)
+ self.assertEqual({}, fields)
+
+ def _test_extract_resource_fields_for_port(self, qos_policy_id):
+ with self._mock_plugin_loaded(True):
+ fields = self.ext_handler.extract_resource_fields(
+ qos_extension.PORT, _get_test_dbdata(qos_policy_id))
+ self.assertEqual({qos.QOS_POLICY_ID: qos_policy_id}, fields)
+
+ def test_extract_resource_fields_no_port_policy(self):
+ self._test_extract_resource_fields_for_port(None)
+
+ def test_extract_resource_fields_port_policy_exists(self):
+ qos_policy_id = mock.Mock()
+ self._test_extract_resource_fields_for_port(qos_policy_id)
+
+ def _test_extract_resource_fields_for_network(self, qos_policy_id):
+ with self._mock_plugin_loaded(True):
+ fields = self.ext_handler.extract_resource_fields(
+ qos_extension.NETWORK, _get_test_dbdata(qos_policy_id))
+ self.assertEqual({qos.QOS_POLICY_ID: qos_policy_id}, fields)
+
+ def test_extract_resource_fields_no_network_policy(self):
+ self._test_extract_resource_fields_for_network(None)
+
+ def test_extract_resource_fields_network_policy_exists(self):
+ qos_policy_id = mock.Mock()
+ qos_policy = mock.Mock()
+ qos_policy.id = qos_policy_id
+ self._test_extract_resource_fields_for_network(qos_policy_id)
testdb = neutron.tests.unit.plugins.ml2.drivers.ext_test:TestDBExtensionDriver
port_security = neutron.plugins.ml2.extensions.port_security:PortSecurityExtensionDriver
cisco_n1kv_ext = neutron.plugins.ml2.drivers.cisco.n1kv.n1kv_ext_driver:CiscoN1kvExtensionDriver
+ qos = neutron.plugins.ml2.extensions.qos:QosExtensionDriver
neutron.openstack.common.cache.backends =
memory = neutron.openstack.common.cache._backends.memory:MemoryBackend
neutron.ipam_drivers =