fw_rule['id'])
# Clean up QoS policies
for qos_policy in cls.qos_policies:
- cls._try_delete_resource(cls.client.delete_qos_policy,
+ cls._try_delete_resource(cls.admin_client.delete_qos_policy,
qos_policy['id'])
# Clean up QoS rules
for qos_rule in cls.qos_rules:
- cls._try_delete_resource(cls.client.delete_qos_rule,
+ cls._try_delete_resource(cls.admin_client.delete_qos_rule,
qos_rule['id'])
# Clean up ike policies
for ikepolicy in cls.ikepolicies:
@classmethod
def create_qos_policy(cls, name, description, shared):
"""Wrapper utility that returns a test QoS policy."""
- body = cls.client.create_qos_policy(name, description, shared)
+ body = cls.admin_client.create_qos_policy(name, description, shared)
qos_policy = body['policy']
cls.qos_policies.append(qos_policy)
return qos_policy
def create_qos_bandwidth_limit_rule(cls, policy_id,
max_kbps, max_burst_kbps):
"""Wrapper utility that returns a test QoS bandwidth limit rule."""
- body = cls.client.create_bandwidth_limit_rule(
+ body = cls.admin_client.create_bandwidth_limit_rule(
policy_id, max_kbps, max_burst_kbps)
qos_rule = body['bandwidth_limit_rule']
cls.qos_rules.append(qos_rule)
# License for the specific language governing permissions and limitations
# under the License.
+from tempest_lib import exceptions
+
from neutron.services.qos import qos_consts
from neutron.tests.api import base
from neutron.tests.tempest import config
policies_ids = [p['id'] for p in policies]
self.assertIn(policy['id'], policies_ids)
+ @test.attr(type='smoke')
+ @test.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6')
+ def test_policy_update(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='',
+ shared=False)
+ self.admin_client.update_qos_policy(policy['id'],
+ description='test policy desc',
+ shared=True)
+
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ retrieved_policy = retrieved_policy['policy']
+ self.assertEqual('test policy desc', retrieved_policy['description'])
+ self.assertEqual(True, retrieved_policy['shared'])
+ self.assertEqual([], retrieved_policy['bandwidth_limit_rules'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')
+ def test_delete_policy(self):
+ policy = self.admin_client.create_qos_policy(
+ 'test-policy', 'desc', True)['policy']
+
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ retrieved_policy = retrieved_policy['policy']
+ self.assertEqual('test-policy', retrieved_policy['name'])
+
+ self.admin_client.delete_qos_policy(policy['id'])
+ self.assertRaises(exceptions.ServerFault,
+ self.admin_client.show_qos_policy, policy['id'])
+
@test.attr(type='smoke')
@test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
- def test_create_rule(self):
+ def test_bandwidth_limit_rule_create(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
max_burst_kbps=1337)
# Test 'show rule'
- retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
+ retrieved_rule = self.admin_client.show_bandwidth_limit_rule(
policy['id'], rule['id'])
- retrieved_policy = retrieved_policy['bandwidth_limit_rule']
- self.assertEqual(rule['id'], retrieved_policy['id'])
- self.assertEqual(200, retrieved_policy['max_kbps'])
- self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
+ retrieved_rule = retrieved_rule['bandwidth_limit_rule']
+ self.assertEqual(rule['id'], retrieved_rule['id'])
+ self.assertEqual(200, retrieved_rule['max_kbps'])
+ self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
# Test 'list rules'
rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])
rules_ids = [r['id'] for r in rules]
self.assertIn(rule['id'], rules_ids)
+ # Test 'show policy'
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ policy_rules = retrieved_policy['policy']['bandwidth_limit_rules']
+ self.assertEqual(1, len(policy_rules))
+ self.assertEqual(rule['id'], policy_rules[0]['id'])
+
+ @test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
+ def test_bandwidth_limit_rule_update(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
+ max_kbps=1,
+ max_burst_kbps=1)
+
+ self.admin_client.update_bandwidth_limit_rule(policy['id'],
+ rule['id'],
+ max_kbps=200,
+ max_burst_kbps=1337)
+
+ retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
+ policy['id'], rule['id'])
+ retrieved_policy = retrieved_policy['bandwidth_limit_rule']
+ self.assertEqual(200, retrieved_policy['max_kbps'])
+ self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
+
+ #TODO(QoS): Uncomment once the rule-delete logic is fixed.
+# @test.attr(type='smoke')
+# @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
+# def test_bandwidth_limit_rule_delete(self):
+# policy = self.create_qos_policy(name='test-policy',
+# description='test policy',
+# shared=False)
+# rule = self.admin_client.create_bandwidth_limit_rule(
+# policy['id'], 200, 1337)['bandwidth_limit_rule']
+#
+# retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
+# policy['id'], rule['id'])
+# retrieved_policy = retrieved_policy['bandwidth_limit_rule']
+# self.assertEqual(rule['id'], retrieved_policy['id'])
+#
+# self.admin_client.delete_bandwidth_limit_rule(policy['id'], rule['id']
+# self.assertRaises(exceptions.ServerFault,
+# self.admin_client.show_bandwidth_limit_rule,
+# policy['id'], rule['id'])
+
@test.attr(type='smoke')
@test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
def test_list_rule_types(self):
for rule in expected_rule_types:
self.assertIn(rule, actual_rule_types)
- #TODO(QoS): policy update (name)
#TODO(QoS): create several bandwidth-limit rules (not sure it makes sense,
# but to test more than one rule)
- #TODO(QoS): update bandwidth-limit rule
#TODO(QoS): associate/disassociate policy with network
#TODO(QoS): associate/disassociate policy with port
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
+ def update_qos_policy(self, policy_id, **kwargs):
+ uri = '%s/qos/policies/%s' % (self.uri_prefix, policy_id)
+ post_data = self.serialize({'policy': kwargs})
+ resp, body = self.put(uri, post_data)
+ body = self.deserialize_single(body)
+ self.expected_success(200, resp.status)
+ return service_client.ResponseBody(resp, body)
+
def get_qos_policy(self, policy_id):
uri = '%s/qos/policies/%s' % (self.uri_prefix, policy_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
- def update_bandwidth_limit_rule(self, policy_id, rule_id,
- max_kbps, max_burst_kbps):
+ def update_bandwidth_limit_rule(self, policy_id, rule_id, **kwargs):
uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
- post_data = {
- 'bandwidth_limit_rule': {
- 'max_kbps': max_kbps,
- 'max_burst_kbps': max_burst_kbps}
- }
+ post_data = {'bandwidth_limit_rule': kwargs}
resp, body = self.put(uri, json.dumps(post_data))
+ body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
+ def delete_bandwidth_limit_rule(self, policy_id, rule_id):
+ uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
+ self.uri_prefix, policy_id, rule_id)
+ resp, body = self.delete(uri)
+ self.expected_success(204, resp.status)
+ return service_client.ResponseBody(resp, body)
+
def list_qos_rule_types(self):
uri = '%s/qos/rule-types' % self.uri_prefix
resp, body = self.get(uri)