policy.create()
return policy.to_dict()
- def update_policy(self, context, policy_id, qos_policy):
- policy = policy_object.QosPolicy(context, **qos_policy['policy'])
+ def update_policy(self, context, policy_id, policy):
+ policy = policy_object.QosPolicy(context, **policy['policy'])
policy.id = policy_id
policy.update()
return policy.to_dict()
context, qos_policy_id=policy_id,
**bandwidth_limit_rule['bandwidth_limit_rule'])
rule.create()
- return rule
+ return rule.to_dict()
def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id,
bandwidth_limit_rule):
context, **bandwidth_limit_rule['bandwidth_limit_rule'])
rule.id = rule_id
rule.update()
- return rule
+ return rule.to_dict()
def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
rule = rule_object.QosBandwidthLimitRule()
cls.fw_rules = []
cls.fw_policies = []
cls.ipsecpolicies = []
+ cls.qos_rules = []
+ cls.qos_policies = []
cls.ethertype = "IPv" + str(cls._ip_version)
@classmethod
for fw_rule in cls.fw_rules:
cls._try_delete_resource(cls.client.delete_firewall_rule,
fw_rule['id'])
+ # Clean up QoS policies
+ for qos_policy in cls.qos_policies:
+ cls._try_delete_resource(cls.client.delete_qos_policy,
+ qos_policy['id'])
+ # Clean up QoS rules
+ for qos_rule in cls.qos_rules:
+ cls._try_delete_resource(cls.client.delete_qos_rule,
+ qos_rule['id'])
# Clean up ike policies
for ikepolicy in cls.ikepolicies:
cls._try_delete_resource(cls.client.delete_ikepolicy,
cls.fw_policies.append(fw_policy)
return fw_policy
+ @classmethod
+ def create_qos_policy(cls, name, description, shared):
+ """Wrapper utility that returns a test QoS policy."""
+ body = cls.client.create_qos_policy(name, description, shared)
+ qos_policy = body['policy']
+ cls.qos_policies.append(qos_policy)
+ return qos_policy
+
+ @classmethod
+ def create_qos_bandwidth_limit_rule(cls, policy_id,
+ max_kbps, max_burst_kbps):
+ """Wrapper utility that returns a test QoS bandwidth limit rule."""
+ body = cls.client.create_bandwidth_limit_rule(
+ policy_id, max_kbps, max_burst_kbps)
+ qos_rule = body['bandwidth_limit_rule']
+ cls.qos_rules.append(qos_rule)
+ return qos_rule
+
@classmethod
def delete_router(cls, router):
body = cls.client.list_router_interfaces(router['id'])
--- /dev/null
+# Copyright 2015 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.tests.api import base
+from neutron.tests.tempest import config
+from neutron.tests.tempest import test
+
+CONF = config.CONF
+
+
+class QosTestJSON(base.BaseAdminNetworkTest):
+ @classmethod
+ def resource_setup(cls):
+ super(QosTestJSON, cls).resource_setup()
+ if not test.is_extension_enabled('qos', 'network'):
+ msg = "qos extension not enabled."
+ raise cls.skipException(msg)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
+ def test_create_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy desc',
+ shared=False)
+
+ # Test 'show policy'
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ retrieved_policy = retrieved_policy['policy']
+ self.assertEqual('test-policy', retrieved_policy['name'])
+ self.assertEqual('test policy desc', retrieved_policy['description'])
+ self.assertEqual(False, retrieved_policy['shared'])
+
+ # Test 'list policies'
+ policies = self.admin_client.list_qos_policies()['policies']
+ policies_ids = [p['id'] for p in policies]
+ self.assertIn(policy['id'], policies_ids)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
+ def test_create_rule(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
+ max_kbps=200,
+ max_burst_kbps=1337)
+
+ # Test 'show rule'
+ retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
+ policy['id'], rule['id'])
+ retrieved_policy = retrieved_policy['bandwidth_limit_rule']
+ self.assertEqual(rule['id'], retrieved_policy['id'])
+ self.assertEqual(200, retrieved_policy['max_kbps'])
+ self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
+
+ # Test 'list rules'
+ rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])
+ rules = rules['bandwidth_limit_rules']
+ rules_ids = [r['id'] for r in rules]
+ self.assertIn(rule['id'], rules_ids)
+
+ #TODO(QoS): policy update (name)
+ #TODO(QoS): create several bandwidth-limit rules (not sure it makes sense,
+ # but to test more than one rule)
+ #TODO(QoS): update bandwidth-limit rule
+ #TODO(QoS): associate/disassociate policy with network
+ #TODO(QoS): associate/disassociate policy with port
'metering_label_rules': 'metering',
'firewall_rules': 'fw',
'firewall_policies': 'fw',
- 'firewalls': 'fw'
+ 'firewalls': 'fw',
+ 'policies': 'qos',
+ 'bandwidth_limit_rules': 'qos',
}
service_prefix = service_resource_prefix_map.get(
plural_name)
'ikepolicy': 'ikepolicies',
'ipsec_site_connection': 'ipsec-site-connections',
'quotas': 'quotas',
- 'firewall_policy': 'firewall_policies'
+ 'firewall_policy': 'firewall_policies',
+ 'qos_policy': 'policies'
}
return resource_plural_map.get(resource_name, resource_name + 's')
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
+
+ def list_qos_policies(self):
+ uri = '%s/qos/policies' % self.uri_prefix
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ body = json.loads(body)
+ return service_client.ResponseBody(resp, body)
+
+ def create_qos_policy(self, name, description, shared):
+ uri = '%s/qos/policies' % self.uri_prefix
+ post_data = self.serialize(
+ {'policy': {
+ 'name': name,
+ 'description': description,
+ 'shared': shared
+ }})
+ resp, body = self.post(uri, post_data)
+ body = self.deserialize_single(body)
+ self.expected_success(201, resp.status)
+ return service_client.ResponseBody(resp, body)
+
+ def get_qos_policy(self, policy_id):
+ uri = '%s/qos/policies/%s' % (self.uri_prefix, policy_id)
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return service_client.ResponseBody(resp, body)
+
+ def create_bandwidth_limit_rule(self, policy_id, max_kbps, max_burst_kbps):
+ uri = '%s/qos/policies/%s/bandwidth_limit_rules' % (
+ self.uri_prefix, policy_id)
+ #TODO(QoS): 'bandwidth_limit' should not be a magic string.
+ post_data = self.serialize(
+ {'bandwidth_limit_rule': {
+ 'max_kbps': max_kbps,
+ 'max_burst_kbps': max_burst_kbps,
+ 'type': 'bandwidth_limit'}})
+ resp, body = self.post(uri, post_data)
+ self.expected_success(201, resp.status)
+ body = json.loads(body)
+ return service_client.ResponseBody(resp, body)
+
+ def list_bandwidth_limit_rules(self, policy_id):
+ uri = '%s/qos/policies/%s/bandwidth_limit_rules' % (
+ self.uri_prefix, policy_id)
+ resp, body = self.get(uri)
+ body = self.deserialize_single(body)
+ self.expected_success(200, resp.status)
+ return service_client.ResponseBody(resp, body)
+
+ def show_bandwidth_limit_rule(self, policy_id, rule_id):
+ uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
+ self.uri_prefix, policy_id, rule_id)
+ resp, body = self.get(uri)
+ body = self.deserialize_single(body)
+ self.expected_success(200, resp.status)
+ return service_client.ResponseBody(resp, body)
+
+ def update_bandwidth_limit_rule(self, policy_id, rule_id,
+ max_kbps, max_burst_kbps):
+ uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
+ self.uri_prefix, policy_id, rule_id)
+ post_data = {
+ 'bandwidth_limit_rule': {
+ 'max_kbps': max_kbps,
+ 'max_burst_kbps': max_burst_kbps,
+ 'type': 'bandwidth_limit'}}
+ resp, body = self.put(uri, json.dumps(post_data))
+ self.expected_success(200, resp.status)
+ return service_client.ResponseBody(resp, body)