setattr(self, 'rules', rules)
self.obj_reset_changes(['rules'])
+ def get_rule_by_id(self, rule_id):
+ """Return rule specified by rule_id.
+
+ @raise QosRuleNotFound: if there is no such rule in the policy.
+ """
+
+ for rule in self.rules:
+ if rule_id == rule.id:
+ return rule
+ raise exceptions.QosRuleNotFound(policy_id=self.id,
+ rule_id=rule_id)
+
@staticmethod
def _is_policy_accessible(context, db_obj):
#TODO(QoS): Look at I3426b13eede8bfa29729cf3efea3419fb91175c4 for
with db_api.autonested_transaction(context.session):
# first, validate that we have access to the policy
policy = self._get_policy_obj(context, policy_id)
+ # check if the rule belong to the policy
+ policy.get_rule_by_id(rule_id)
rule = rule_object.QosBandwidthLimitRule(
context, **bandwidth_limit_rule['bandwidth_limit_rule'])
rule.id = rule_id
with db_api.autonested_transaction(context.session):
# first, validate that we have access to the policy
policy = self._get_policy_obj(context, policy_id)
- rule = rule_object.QosBandwidthLimitRule(context)
- rule.id = rule_id
+ rule = policy.get_rule_by_id(rule_id)
rule.delete()
policy.reload_rules()
self.notification_driver_manager.update_policy(context, policy)
self._validate_notif_driver_params('update_policy')
def test_update_policy_rule(self):
+ _policy = policy_object.QosPolicy(
+ self.ctxt, **self.policy_data['policy'])
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
- return_value=self.policy):
+ return_value=_policy):
+ setattr(_policy, "rules", [self.rule])
self.qos_plugin.update_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id, self.policy.id, self.rule_data)
self._validate_notif_driver_params('update_policy')
+ def test_update_policy_rule_bad_policy(self):
+ _policy = policy_object.QosPolicy(
+ self.ctxt, **self.policy_data['policy'])
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=_policy):
+ setattr(_policy, "rules", [])
+ self.assertRaises(
+ n_exc.QosRuleNotFound,
+ self.qos_plugin.update_policy_bandwidth_limit_rule,
+ self.ctxt, self.rule.id, self.policy.id,
+ self.rule_data)
+
def test_delete_policy_rule(self):
+ _policy = policy_object.QosPolicy(
+ self.ctxt, **self.policy_data['policy'])
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
- return_value=self.policy):
+ return_value=_policy):
+ setattr(_policy, "rules", [self.rule])
self.qos_plugin.delete_policy_bandwidth_limit_rule(
- self.ctxt, self.rule.id, self.policy.id)
+ self.ctxt, self.rule.id, _policy.id)
self._validate_notif_driver_params('update_policy')
+ def test_delete_policy_rule_bad_policy(self):
+ _policy = policy_object.QosPolicy(
+ self.ctxt, **self.policy_data['policy'])
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=_policy):
+ setattr(_policy, "rules", [])
+ self.assertRaises(
+ n_exc.QosRuleNotFound,
+ self.qos_plugin.delete_policy_bandwidth_limit_rule,
+ self.ctxt, self.rule.id, _policy.id)
+
def test_get_policy_bandwidth_limit_rules_for_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=self.policy):