from oslo_log import log as logging
+from neutron.common import exceptions as n_exc
from neutron.db import db_base_plugin_common
from neutron.extensions import qos
from neutron.objects.qos import policy as policy_object
policy.delete()
def _get_policy_obj(self, context, policy_id):
- return policy_object.QosPolicy.get_by_id(context, policy_id)
-
- def _update_policy_on_driver(self, context, policy_id):
- policy = self._get_policy_obj(context, policy_id)
- self.notification_driver_manager.update_policy(policy)
+ obj = policy_object.QosPolicy.get_by_id(context, policy_id)
+ if obj is None:
+ raise n_exc.QosPolicyNotFound(policy_id=policy_id)
+ return obj
@db_base_plugin_common.filter_fields
def get_policy(self, context, policy_id, fields=None):
# in the future we need an inter-rule validation
# mechanism to verify all created rules will
# play well together.
+ # validate that we have access to the policy
+ policy = self._get_policy_obj(context, policy_id)
rule = rule_object.QosBandwidthLimitRule(
context, qos_policy_id=policy_id,
**bandwidth_limit_rule['bandwidth_limit_rule'])
rule.create()
- self._update_policy_on_driver(context, policy_id)
+ self.notification_driver_manager.update_policy(policy)
return rule.to_dict()
def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id,
bandwidth_limit_rule):
+ # validate that we have access to the policy
+ policy = self._get_policy_obj(context, policy_id)
rule = rule_object.QosBandwidthLimitRule(
context, **bandwidth_limit_rule['bandwidth_limit_rule'])
rule.id = rule_id
rule.update()
- self._update_policy_on_driver(context, policy_id)
+ self.notification_driver_manager.update_policy(policy)
return rule.to_dict()
def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
+ # validate that we have access to the policy
+ policy = self._get_policy_obj(context, policy_id)
rule = rule_object.QosBandwidthLimitRule(context)
rule.id = rule_id
rule.delete()
- self._update_policy_on_driver(context, policy_id)
+ self.notification_driver_manager.update_policy(policy)
@db_base_plugin_common.filter_fields
def get_policy_bandwidth_limit_rule(self, context, rule_id,
policy_id, fields=None):
+ # validate that we have access to the policy
+ self._get_policy_obj(context, policy_id)
return rule_object.QosBandwidthLimitRule.get_by_id(context,
rule_id).to_dict()
sorts=None, limit=None,
marker=None, page_reverse=False):
#TODO(QoS): Support all the optional parameters
+ # validate that we have access to the policy
+ self._get_policy_obj(context, policy_id)
return [rule_obj.to_dict() for rule_obj in
rule_object.QosBandwidthLimitRule.get_objects(context)]
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
+from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
from neutron.objects.qos import policy as policy_object
self.assertIsInstance(
self.registry_m.call_args[0][2], policy_object.QosPolicy)
- def test_qos_plugin_add_policy(self):
+ def test_add_policy(self):
self.qos_plugin.create_policy(self.ctxt, self.policy_data)
self.assertFalse(self.registry_m.called)
- def test_qos_plugin_update_policy(self):
+ def test_update_policy(self):
self.qos_plugin.update_policy(
self.ctxt, self.policy.id, self.policy_data)
self._validate_registry_params(events.UPDATED)
- def test_qos_plugin_delete_policy(self):
+ def test_delete_policy(self):
self.qos_plugin.delete_policy(self.ctxt, self.policy.id)
self._validate_registry_params(events.DELETED)
- def test_qos_plugin_create_policy_rule(self):
- self.qos_plugin.create_policy_bandwidth_limit_rule(
- self.ctxt, self.policy.id, self.rule_data)
- self._validate_registry_params(events.UPDATED)
-
- def test_qos_plugin_update_policy_rule(self):
- self.qos_plugin.update_policy_bandwidth_limit_rule(
- self.ctxt, self.rule.id, self.policy.id, self.rule_data)
- self._validate_registry_params(events.UPDATED)
-
- def test_qos_plugin_delete_policy_rule(self):
- self.qos_plugin.delete_policy_bandwidth_limit_rule(
- self.ctxt, self.rule.id, self.policy.id)
- self._validate_registry_params(events.UPDATED)
+ def test_create_policy_rule(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=self.policy):
+ self.qos_plugin.create_policy_bandwidth_limit_rule(
+ self.ctxt, self.policy.id, self.rule_data)
+ self._validate_registry_params(events.UPDATED)
+
+ def test_update_policy_rule(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=self.policy):
+ self.qos_plugin.update_policy_bandwidth_limit_rule(
+ self.ctxt, self.rule.id, self.policy.id, self.rule_data)
+ self._validate_registry_params(events.UPDATED)
+
+ def test_delete_policy_rule(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=self.policy):
+ self.qos_plugin.delete_policy_bandwidth_limit_rule(
+ self.ctxt, self.rule.id, self.policy.id)
+ self._validate_registry_params(events.UPDATED)
+
+ def test_get_policy_for_nonexistent_policy(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=None):
+ self.assertRaises(
+ n_exc.QosPolicyNotFound,
+ self.qos_plugin.get_policy,
+ self.ctxt, self.policy.id)
+
+ def test_get_policy_bandwidth_limit_rule_for_nonexistent_policy(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=None):
+ self.assertRaises(
+ n_exc.QosPolicyNotFound,
+ self.qos_plugin.get_policy_bandwidth_limit_rule,
+ self.ctxt, self.rule.id, self.policy.id)
+
+ def test_get_policy_bandwidth_limit_rules_for_nonexistent_policy(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=None):
+ self.assertRaises(
+ n_exc.QosPolicyNotFound,
+ self.qos_plugin.get_policy_bandwidth_limit_rules,
+ self.ctxt, self.policy.id)
+
+ def test_create_policy_rule_for_nonexistent_policy(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=None):
+ self.assertRaises(
+ n_exc.QosPolicyNotFound,
+ self.qos_plugin.create_policy_bandwidth_limit_rule,
+ self.ctxt, self.policy.id, self.rule_data)
+
+ def test_update_policy_rule_for_nonexistent_policy(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=None):
+ self.assertRaises(
+ n_exc.QosPolicyNotFound,
+ self.qos_plugin.update_policy_bandwidth_limit_rule,
+ self.ctxt, self.rule.id, self.policy.id, self.rule_data)
+
+ def test_delete_policy_rule_for_nonexistent_policy(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=None):
+ self.assertRaises(
+ n_exc.QosPolicyNotFound,
+ self.qos_plugin.delete_policy_bandwidth_limit_rule,
+ self.ctxt, self.rule.id, self.policy.id)