from neutron.objects.qos import rule as rule_object
from neutron.objects.qos import rule_type as rule_type_object
from neutron.services.qos.notification_drivers import manager as driver_mgr
+from neutron.services.qos import qos_consts
LOG = logging.getLogger(__name__)
with db_api.autonested_transaction(context.session):
# first, validate that we have access to the policy
self._get_policy_obj(context, policy_id)
+ filters = filters or dict()
+ filters[qos_consts.QOS_POLICY_ID] = policy_id
return rule_object.QosBandwidthLimitRule.get_objects(context,
**filters)
exceptions.Forbidden,
self.client.create_bandwidth_limit_rule,
'policy', 1, 2)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
+ def test_get_rules_by_policy(self):
+ policy1 = self.create_qos_policy(name='test-policy1',
+ description='test policy1',
+ shared=False)
+ rule1 = self.create_qos_bandwidth_limit_rule(policy_id=policy1['id'],
+ max_kbps=200,
+ max_burst_kbps=1337)
+
+ policy2 = self.create_qos_policy(name='test-policy2',
+ description='test policy2',
+ shared=False)
+ rule2 = self.create_qos_bandwidth_limit_rule(policy_id=policy2['id'],
+ max_kbps=5000,
+ max_burst_kbps=2523)
+
+ # Test 'list rules'
+ rules = self.admin_client.list_bandwidth_limit_rules(policy1['id'])
+ rules = rules['bandwidth_limit_rules']
+ rules_ids = [r['id'] for r in rules]
+ self.assertIn(rule1['id'], rules_ids)
+ self.assertNotIn(rule2['id'], rules_ids)
self.ctxt, self.rule.id, self.policy.id)
self._validate_notif_driver_params('update_policy')
+ def test_get_policy_bandwidth_limit_rules_for_policy(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=self.policy):
+ with mock.patch('neutron.objects.qos.rule.'
+ 'QosBandwidthLimitRule.'
+ 'get_objects') as get_object_mock:
+ self.qos_plugin.get_policy_bandwidth_limit_rules(
+ self.ctxt, self.policy.id)
+ get_object_mock.assert_called_once_with(
+ self.ctxt, qos_policy_id=self.policy.id)
+
+ def test_get_policy_bandwidth_limit_rules_for_policy_with_filters(self):
+ with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
+ return_value=self.policy):
+ with mock.patch('neutron.objects.qos.rule.'
+ 'QosBandwidthLimitRule.'
+ 'get_objects') as get_object_mock:
+
+ filters = {'filter': 'filter_id'}
+ self.qos_plugin.get_policy_bandwidth_limit_rules(
+ self.ctxt, self.policy.id, filters=filters)
+ get_object_mock.assert_called_once_with(
+ self.ctxt, qos_policy_id=self.policy.id,
+ filter='filter_id')
+
def test_get_policy_for_nonexistent_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=None):