--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_log import log as logging
+
+from neutron.api.rpc.callbacks import events
+from neutron.api.rpc.callbacks import registry as rpc_registry
+from neutron.api.rpc.callbacks import resources
+from neutron.i18n import _LW
+from neutron.objects.qos import policy as policy_object
+from neutron.services.qos.notification_drivers import qos_base
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _get_qos_policy_cb(resource, policy_id, **kwargs):
+ context = kwargs.get('context')
+ if context is None:
+ LOG.warning(_LW(
+ 'Received %(resource)s %(policy_id)s without context'),
+ {'resource': resource, 'policy_id': policy_id}
+ )
+ return
+
+ policy = policy_object.QosPolicy.get_by_id(context, policy_id)
+ return policy
+
+
+class RpcQosServiceNotificationDriver(
+ qos_base.QosServiceNotificationDriverBase):
+ """RPC message queue service notification driver for QoS."""
+
+ def __init__(self):
+ LOG.debug(
+ "Initializing RPC Messaging Queue notification driver for QoS")
+ rpc_registry.register_provider(
+ _get_qos_policy_cb,
+ resources.QOS_POLICY)
+
+ def create_policy(self, policy):
+ #No need to update agents on create
+ pass
+
+ def update_policy(self, policy):
+ # TODO(QoS): this is temporary until we get notify() implemented
+ try:
+ rpc_registry.notify(resources.QOS_POLICY,
+ events.UPDATED,
+ policy)
+ except NotImplementedError:
+ pass
+
+ def delete_policy(self, policy):
+ # TODO(QoS): this is temporary until we get notify() implemented
+ try:
+ rpc_registry.notify(resources.QOS_POLICY,
+ events.DELETED,
+ policy)
+ except NotImplementedError:
+ pass
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import abc
+
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class QosServiceNotificationDriverBase(object):
+ """QoS service notification driver base class."""
+
+ @abc.abstractmethod
+ def create_policy(self, policy):
+ """Create the QoS policy."""
+
+ @abc.abstractmethod
+ def update_policy(self, policy):
+ """Update the QoS policy.
+
+ Apply changes to the QoS policy.
+ """
+
+ @abc.abstractmethod
+ def delete_policy(self, policy):
+ """Delete the QoS policy.
+
+ Remove all rules for this policy and free up all the resources.
+ """
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+from oslo_log import log as logging
-from neutron import manager
-from neutron.api.rpc.callbacks import registry as rpc_registry
-from neutron.api.rpc.callbacks import resources as rpc_resources
from neutron.db import db_base_plugin_common
from neutron.extensions import qos
-from neutron.i18n import _LW
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.objects.qos import rule_type as rule_type_object
-from neutron.plugins.common import constants
-
-from oslo_log import log as logging
+from neutron.services.qos.notification_drivers import message_queue
LOG = logging.getLogger(__name__)
-def _get_qos_policy_cb(resource_type, policy_id, **kwargs):
- qos_plugin = manager.NeutronManager.get_service_plugins().get(
- constants.QOS)
- context = kwargs.get('context')
- if context is None:
- LOG.warning(_LW(
- 'Received %(resource_type)s %(policy_id)s without context'),
- {'resource_type': resource_type, 'policy_id': policy_id}
- )
- return
-
- qos_policy = qos_plugin.get_qos_policy(context, policy_id)
- return qos_policy
-
-
class QoSPlugin(qos.QoSPluginBase):
"""Implementation of the Neutron QoS Service Plugin.
def __init__(self):
super(QoSPlugin, self).__init__()
- rpc_registry.register_provider(
- _get_qos_policy_cb,
- rpc_resources.QOS_POLICY)
+ #TODO(QoS) load from configuration option
+ self.notification_driver = (
+ message_queue.RpcQosServiceNotificationDriver())
def create_policy(self, context, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.create()
+ self.notification_driver.create_policy(policy)
return policy.to_dict()
def update_policy(self, context, policy_id, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.id = policy_id
policy.update()
+ self.notification_driver.update_policy(policy)
return policy.to_dict()
def delete_policy(self, context, policy_id):
policy = policy_object.QosPolicy(context)
policy.id = policy_id
+ self.notification_driver.delete_policy(policy)
policy.delete()
def _get_policy_obj(self, context, policy_id):
return policy_object.QosPolicy.get_by_id(context, policy_id)
+ def _update_policy_on_driver(self, context, policy_id):
+ policy = self._get_policy_obj(context, policy_id)
+ self.notification_driver.update_policy(policy)
+
@db_base_plugin_common.filter_fields
def get_policy(self, context, policy_id, fields=None):
return self._get_policy_obj(context, policy_id).to_dict()
context, qos_policy_id=policy_id,
**bandwidth_limit_rule['bandwidth_limit_rule'])
rule.create()
+ self._update_policy_on_driver(context, policy_id)
return rule.to_dict()
def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id,
context, **bandwidth_limit_rule['bandwidth_limit_rule'])
rule.id = rule_id
rule.update()
+ self._update_policy_on_driver(context, policy_id)
return rule.to_dict()
def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
rule = rule_object.QosBandwidthLimitRule(context)
rule.id = rule_id
rule.delete()
+ self._update_policy_on_driver(context, policy_id)
@db_base_plugin_common.filter_fields
def get_policy_bandwidth_limit_rule(self, context, rule_id,
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.api.rpc.callbacks import events
+from neutron.api.rpc.callbacks import resources
+from neutron import context
+from neutron.objects.qos import policy as policy_object
+from neutron.objects.qos import rule as rule_object
+from neutron.services.qos.notification_drivers import message_queue
+from neutron.tests import base
+
+DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+
+
+class TestQosRpcNotificationDriver(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestQosRpcNotificationDriver, self).setUp()
+
+ registry_p = mock.patch(
+ 'neutron.api.rpc.callbacks.registry.notify')
+ self.registry_m = registry_p.start()
+ self.driver = message_queue.RpcQosServiceNotificationDriver()
+
+ self.policy_data = {'policy': {
+ 'id': 7777777,
+ 'tenant_id': 888888,
+ 'name': 'testi-policy',
+ 'description': 'test policyi description',
+ 'shared': True}}
+
+ self.rule_data = {'bandwidth_limit_rule': {
+ 'id': 7777777,
+ 'max_kbps': 100,
+ 'max_burst_kbps': 150}}
+
+ self.policy = policy_object.QosPolicy(context,
+ **self.policy_data['policy'])
+
+ self.rule = rule_object.QosBandwidthLimitRule(
+ context,
+ **self.rule_data['bandwidth_limit_rule'])
+
+ def _validate_registry_params(self, event_type, policy):
+ self.assertTrue(self.registry_m.called, policy)
+ self.registry_m.assert_called_once_with(
+ resources.QOS_POLICY,
+ event_type,
+ policy)
+
+ def test_create_policy(self):
+ self.driver.create_policy(self.policy)
+ self.assertFalse(self.registry_m.called)
+
+ def test_update_policy(self):
+ self.driver.update_policy(self.policy)
+ self._validate_registry_params(events.UPDATED, self.policy)
+
+ def test_delete_policy(self):
+ self.driver.delete_policy(self.policy)
+ self._validate_registry_params(events.DELETED, self.policy)
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+from oslo_config import cfg
+
+from neutron.api.rpc.callbacks import events
+from neutron.api.rpc.callbacks import resources
+from neutron import context
+from neutron import manager
+from neutron.objects.qos import policy as policy_object
+from neutron.objects.qos import rule as rule_object
+from neutron.plugins.common import constants
+from neutron.tests import base
+
+
+DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+
+
+class TestQosPlugin(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestQosPlugin, self).setUp()
+ self.setup_coreplugin()
+
+ mock.patch('neutron.db.api.create_object').start()
+ mock.patch('neutron.db.api.update_object').start()
+ mock.patch('neutron.db.api.delete_object').start()
+ mock.patch('neutron.db.api.get_object').start()
+ mock.patch(
+ 'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start()
+ self.registry_p = mock.patch(
+ 'neutron.api.rpc.callbacks.registry.notify')
+ self.registry_m = self.registry_p.start()
+ cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
+ cfg.CONF.set_override("service_plugins", ["qos"])
+
+ mgr = manager.NeutronManager.get_instance()
+ self.qos_plugin = mgr.get_service_plugins().get(
+ constants.QOS)
+ self.ctxt = context.Context('fake_user', 'fake_tenant')
+ self.policy_data = {
+ 'policy': {'id': 7777777,
+ 'tenant_id': 888888,
+ 'name': 'test-policy',
+ 'description': 'Test policy description',
+ 'shared': True}}
+
+ self.rule_data = {
+ 'bandwidth_limit_rule': {'id': 7777777,
+ 'max_kbps': 100,
+ 'max_burst_kbps': 150}}
+
+ self.policy = policy_object.QosPolicy(
+ context, **self.policy_data['policy'])
+
+ self.rule = rule_object.QosBandwidthLimitRule(
+ context, **self.rule_data['bandwidth_limit_rule'])
+
+ def _validate_registry_params(self, event_type):
+ self.registry_m.assert_called_once_with(
+ resources.QOS_POLICY,
+ event_type,
+ mock.ANY)
+ self.assertIsInstance(
+ self.registry_m.call_args[0][2], policy_object.QosPolicy)
+
+ def test_qos_plugin_add_policy(self):
+ self.qos_plugin.create_policy(self.ctxt, self.policy_data)
+ self.assertFalse(self.registry_m.called)
+
+ def test_qos_plugin_update_policy(self):
+ self.qos_plugin.update_policy(
+ self.ctxt, self.policy.id, self.policy_data)
+ self._validate_registry_params(events.UPDATED)
+
+ def test_qos_plugin_delete_policy(self):
+ self.qos_plugin.delete_policy(self.ctxt, self.policy.id)
+ self._validate_registry_params(events.DELETED)
+
+ def test_qos_plugin_create_policy_rule(self):
+ self.qos_plugin.create_policy_bandwidth_limit_rule(
+ self.ctxt, self.policy.id, self.rule_data)
+ self._validate_registry_params(events.UPDATED)
+
+ def test_qos_plugin_update_policy_rule(self):
+ self.qos_plugin.update_policy_bandwidth_limit_rule(
+ self.ctxt, self.rule.id, self.policy.id, self.rule_data)
+ self._validate_registry_params(events.UPDATED)
+
+ def test_qos_plugin_delete_policy_rule(self):
+ self.qos_plugin.delete_policy_bandwidth_limit_rule(
+ self.ctxt, self.rule.id, self.policy.id)
+ self._validate_registry_params(events.UPDATED)