The agent based RPC notification driver for message queue is the default.
Added support for multiple notification drivers.
DocImpact
Partially-Implements: blueprint quantum-qos-api
Change-Id: I4108c3d111067d8217bc4112c05e1bde0125e0ef
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from neutron.i18n import _LI
+from neutron import manager
+
+QOS_DRIVER_NAMESPACE = 'neutron.qos.service_notification_drivers'
+QOS_PLUGIN_OPTS = [
+ cfg.ListOpt('service_notification_drivers',
+ default='message_queue',
+ help=_('Drivers list to use to send the update notification')),
+]
+
+cfg.CONF.register_opts(QOS_PLUGIN_OPTS, "qos")
+
+LOG = logging.getLogger(__name__)
+
+
+class QosServiceNotificationDriverManager(object):
+
+ def __init__(self):
+ self.notification_drivers = []
+ self._load_drivers(cfg.CONF.qos.service_notification_drivers)
+
+ def update_policy(self, qos_policy):
+ for driver in self.notification_drivers:
+ driver.update_policy(qos_policy)
+
+ def delete_policy(self, qos_policy):
+ for driver in self.notification_drivers:
+ driver.delete_policy(qos_policy)
+
+ def create_policy(self, qos_policy):
+ for driver in self.notification_drivers:
+ driver.create_policy(qos_policy)
+
+ def _load_drivers(self, notification_drivers):
+ """Load all the instances of the configured QoS notification drivers
+
+ :param notification_drivers: comma separated string
+ """
+ if not notification_drivers:
+ raise SystemExit(_('A QoS driver must be specified'))
+ LOG.debug("Loading QoS notification drivers: %s", notification_drivers)
+ for notification_driver in notification_drivers:
+ driver_ins = self._load_driver_instance(notification_driver)
+ self.notification_drivers.append(driver_ins)
+
+ def _load_driver_instance(self, notification_driver):
+ """Returns an instance of the configured QoS notification driver
+
+ :returns: An instance of Driver for the QoS notification
+ """
+ mgr = manager.NeutronManager
+ driver = mgr.load_class_for_provider(QOS_DRIVER_NAMESPACE,
+ notification_driver)
+ driver_instance = driver()
+ LOG.info(
+ _LI("Loading %(name)s (%(description)s) notification driver "
+ "for QoS plugin"),
+ {"name": notification_driver,
+ "description": driver_instance.get_description()})
+ return driver_instance
"""RPC message queue service notification driver for QoS."""
def __init__(self):
- LOG.debug(
- "Initializing RPC Messaging Queue notification driver for QoS")
rpc_registry.register_provider(
_get_qos_policy_cb,
resources.QOS_POLICY)
+ def get_description(self):
+ return "Message queue updates"
+
def create_policy(self, policy):
#No need to update agents on create
pass
class QosServiceNotificationDriverBase(object):
"""QoS service notification driver base class."""
+ @abc.abstractmethod
+ def get_description(self):
+ """Get the notification driver description.
+ """
+
@abc.abstractmethod
def create_policy(self, policy):
"""Create the QoS policy."""
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.objects.qos import rule_type as rule_type_object
-from neutron.services.qos.notification_drivers import message_queue
+from neutron.services.qos.notification_drivers import manager as driver_mgr
LOG = logging.getLogger(__name__)
def __init__(self):
super(QoSPlugin, self).__init__()
- #TODO(QoS) load from configuration option
- self.notification_driver = (
- message_queue.RpcQosServiceNotificationDriver())
+ self.notification_driver_manager = (
+ driver_mgr.QosServiceNotificationDriverManager())
def create_policy(self, context, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.create()
- self.notification_driver.create_policy(policy)
+ self.notification_driver_manager.create_policy(policy)
return policy.to_dict()
def update_policy(self, context, policy_id, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.id = policy_id
policy.update()
- self.notification_driver.update_policy(policy)
+ self.notification_driver_manager.update_policy(policy)
return policy.to_dict()
def delete_policy(self, context, policy_id):
policy = policy_object.QosPolicy(context)
policy.id = policy_id
- self.notification_driver.delete_policy(policy)
+ self.notification_driver_manager.delete_policy(policy)
policy.delete()
def _get_policy_obj(self, context, policy_id):
def _update_policy_on_driver(self, context, policy_id):
policy = self._get_policy_obj(context, policy_id)
- self.notification_driver.update_policy(policy)
+ self.notification_driver_manager.update_policy(policy)
@db_base_plugin_common.filter_fields
def get_policy(self, context, policy_id, fields=None):
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.services.qos.notification_drivers import qos_base
+
+
+class DummyQosServiceNotificationDriver(
+ qos_base.QosServiceNotificationDriverBase):
+ """Dummy service notification driver for QoS."""
+
+ def get_description(self):
+ return "Dummy"
+
+ def create_policy(self, policy):
+ pass
+
+ def update_policy(self, policy):
+ pass
+
+ def delete_policy(self, policy):
+ pass
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+from oslo_config import cfg
+
+from neutron.api.rpc.callbacks import events
+from neutron.api.rpc.callbacks import resources
+from neutron import context
+from neutron.objects.qos import policy as policy_object
+from neutron.services.qos.notification_drivers import manager as driver_mgr
+from neutron.services.qos.notification_drivers import message_queue
+from neutron.tests import base
+
+DUMMY_DRIVER = ("neutron.tests.unit.services.qos.notification_drivers."
+ "dummy.DummyQosServiceNotificationDriver")
+
+
+def _load_multiple_drivers():
+ cfg.CONF.set_override(
+ "service_notification_drivers",
+ ["message_queue", DUMMY_DRIVER],
+ "qos")
+
+
+class TestQosDriversManager(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestQosDriversManager, self).setUp()
+ self.config_parse()
+ self.setup_coreplugin()
+ self.registry_p = mock.patch(
+ 'neutron.api.rpc.callbacks.registry.notify')
+ self.registry_m = self.registry_p.start()
+ self.driver_manager = driver_mgr.QosServiceNotificationDriverManager()
+ config = cfg.ConfigOpts()
+ config.register_opts(driver_mgr.QOS_PLUGIN_OPTS, "qos")
+ self.policy_data = {'policy': {
+ 'id': 7777777,
+ 'tenant_id': 888888,
+ 'name': 'test-policy',
+ 'description': 'test policy description',
+ 'shared': True}}
+
+ self.policy = policy_object.QosPolicy(context,
+ **self.policy_data['policy'])
+ ctxt = None
+ self.kwargs = {'context': ctxt}
+
+ def _validate_registry_params(self, event_type, policy):
+ self.assertTrue(self.registry_m.called, policy)
+ self.registry_m.assert_called_with(
+ resources.QOS_POLICY,
+ event_type,
+ policy)
+
+ def test_create_policy_default_configuration(self):
+ #RPC driver should be loaded by default
+ self.driver_manager.create_policy(self.policy)
+ self.assertFalse(self.registry_m.called)
+
+ def test_update_policy_default_configuration(self):
+ #RPC driver should be loaded by default
+ self.driver_manager.update_policy(self.policy)
+ self._validate_registry_params(events.UPDATED, self.policy)
+
+ def test_delete_policy_default_configuration(self):
+ #RPC driver should be loaded by default
+ self.driver_manager.delete_policy(self.policy)
+ self._validate_registry_params(events.DELETED, self.policy)
+
+ def _test_multi_drivers_configuration_op(self, op):
+ _load_multiple_drivers()
+ # create a new manager with new configuration
+ driver_manager = driver_mgr.QosServiceNotificationDriverManager()
+ handler = '%s_policy' % op
+ with mock.patch('.'.join([DUMMY_DRIVER, handler])) as dummy_mock:
+ rpc_driver = message_queue.RpcQosServiceNotificationDriver
+ with mock.patch.object(rpc_driver, handler) as rpc_mock:
+ getattr(driver_manager, handler)(self.policy)
+ for mock_ in (dummy_mock, rpc_mock):
+ mock_.assert_called_with(self.policy)
+
+ def test_multi_drivers_configuration_create(self):
+ self._test_multi_drivers_configuration_op('create')
+
+ def test_multi_drivers_configuration_update(self):
+ self._test_multi_drivers_configuration_op('update')
+
+ def test_multi_drivers_configuration_delete(self):
+ self._test_multi_drivers_configuration_op('delete')
# These are for backwards compat with Juno vpnaas service provider configuration values
neutron.services.vpn.service_drivers.cisco_ipsec.CiscoCsrIPsecVPNDriver = neutron_vpnaas.services.vpn.service_drivers.cisco_ipsec:CiscoCsrIPsecVPNDriver
neutron.services.vpn.service_drivers.ipsec.IPsecVPNDriver = neutron_vpnaas.services.vpn.service_drivers.ipsec:IPsecVPNDriver
+neutron.qos.service_notification_drivers =
+ message_queue = neutron.services.qos.notification_drivers.message_queue:RpcQosServiceNotificationDriver
neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver