retrieved_policy = retrieved_policy['policy']
self.assertEqual('test-policy', retrieved_policy['name'])
self.assertEqual('test policy desc', retrieved_policy['description'])
- self.assertEqual(False, retrieved_policy['shared'])
+ self.assertFalse(retrieved_policy['shared'])
# Test 'list policies'
policies = self.admin_client.list_qos_policies()['policies']
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertEqual('test policy desc', retrieved_policy['description'])
- self.assertEqual(True, retrieved_policy['shared'])
+ self.assertTrue(retrieved_policy['shared'])
self.assertEqual([], retrieved_policy['bandwidth_limit_rules'])
@test.attr(type='smoke')
self.assertRaises(exceptions.NotFound,
self.admin_client.show_qos_policy, policy['id'])
+ @test.attr(type='smoke')
+ @test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
+ def test_list_rule_types(self):
+ # List supported rule types
+ expected_rule_types = qos_consts.VALID_RULE_TYPES
+ expected_rule_details = ['type']
+
+ rule_types = self.admin_client.list_qos_rule_types()
+ actual_list_rule_types = rule_types['rule_types']
+ actual_rule_types = [rule['type'] for rule in actual_list_rule_types]
+
+ # Verify that only required fields present in rule details
+ for rule in actual_list_rule_types:
+ self.assertEqual(tuple(rule.keys()), tuple(expected_rule_details))
+
+ # Verify if expected rules are present in the actual rules list
+ for rule in expected_rule_types:
+ self.assertIn(rule, actual_rule_types)
+
+ def _disassociate_network(self, client, network_id):
+ client.update_network(network_id, qos_policy_id=None)
+ updated_network = self.admin_client.show_network(network_id)
+ self.assertIsNone(updated_network['network']['qos_policy_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0')
+ def test_policy_association_with_admin_network(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ network = self.create_shared_network('test network',
+ qos_policy_id=policy['id'])
+
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertEqual(
+ policy['id'], retrieved_network['network']['qos_policy_id'])
+
+ self._disassociate_network(self.admin_client, network['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e')
+ def test_policy_association_with_tenant_network(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_network('test network',
+ qos_policy_id=policy['id'])
+
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertEqual(
+ policy['id'], retrieved_network['network']['qos_policy_id'])
+
+ self._disassociate_network(self.client, network['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b')
+ def test_policy_association_with_network_non_shared_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ #TODO(QoS): This currently raises an exception on the server side. See
+ # services/qos/qos_extension.py for comments on this subject.
+ network = self.create_network('test network',
+ qos_policy_id=policy['id'])
+
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertIsNone(retrieved_network['network']['qos_policy_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8')
+ def test_policy_update_association_with_admin_network(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ network = self.create_shared_network('test network')
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertIsNone(retrieved_network['network']['qos_policy_id'])
+
+ self.admin_client.update_network(network['id'],
+ qos_policy_id=policy['id'])
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertEqual(
+ policy['id'], retrieved_network['network']['qos_policy_id'])
+
+ self._disassociate_network(self.admin_client, network['id'])
+
+ def _disassociate_port(self, port_id):
+ self.client.update_port(port_id, qos_policy_id=None)
+ updated_port = self.admin_client.show_port(port_id)
+ self.assertIsNone(updated_port['port']['qos_policy_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e')
+ def test_policy_association_with_port_shared_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_shared_network('test network')
+ port = self.create_port(network, qos_policy_id=policy['id'])
+
+ retrieved_port = self.admin_client.show_port(port['id'])
+ self.assertEqual(
+ policy['id'], retrieved_port['port']['qos_policy_id'])
+
+ self._disassociate_port(port['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031')
+ def test_policy_association_with_port_non_shared_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ network = self.create_shared_network('test network')
+ #TODO(QoS): This currently raises an exception on the server side. See
+ # services/qos/qos_extension.py for comments on this subject.
+ port = self.create_port(network, qos_policy_id=policy['id'])
+
+ retrieved_port = self.admin_client.show_port(port['id'])
+ self.assertIsNone(retrieved_port['port']['qos_policy_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76')
+ def test_policy_update_association_with_port_shared_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_shared_network('test network')
+ port = self.create_port(network)
+ retrieved_port = self.admin_client.show_port(port['id'])
+ self.assertIsNone(retrieved_port['port']['qos_policy_id'])
+
+ self.client.update_port(port['id'], qos_policy_id=policy['id'])
+ retrieved_port = self.admin_client.show_port(port['id'])
+ self.assertEqual(
+ policy['id'], retrieved_port['port']['qos_policy_id'])
+
+ self._disassociate_port(port['id'])
+
+
+class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
+ @classmethod
+ def resource_setup(cls):
+ super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()
+ if not test.is_extension_enabled('qos', 'network'):
+ msg = "qos extension not enabled."
+ raise cls.skipException(msg)
+
@test.attr(type='smoke')
@test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
- def test_bandwidth_limit_rule_create(self):
+ def test_rule_create(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
self.assertEqual(1, len(policy_rules))
self.assertEqual(rule['id'], policy_rules[0]['id'])
+ @test.attr(type='smoke')
@test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
- def test_bandwidth_limit_rule_update(self):
+ def test_rule_update(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
#TODO(QoS): Uncomment once the rule-delete logic is fixed.
# @test.attr(type='smoke')
# @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
-# def test_bandwidth_limit_rule_delete(self):
+# def test_rule_delete(self):
# policy = self.create_qos_policy(name='test-policy',
# description='test policy',
# shared=False)
# self.admin_client.show_bandwidth_limit_rule,
# policy['id'], rule['id'])
- @test.attr(type='smoke')
- @test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
- def test_list_rule_types(self):
- # List supported rule types
- expected_rule_types = qos_consts.VALID_RULE_TYPES
- expected_rule_details = ['type']
-
- rule_types = self.admin_client.list_qos_rule_types()
- actual_list_rule_types = rule_types['rule_types']
- actual_rule_types = [rule['type'] for rule in actual_list_rule_types]
-
- # Verify that only required fields present in rule details
- for rule in actual_list_rule_types:
- self.assertEqual(tuple(rule.keys()), tuple(expected_rule_details))
-
- # Verify if expected rules are present in the actual rules list
- for rule in expected_rule_types:
- self.assertIn(rule, actual_rule_types)
-
#TODO(QoS): create several bandwidth-limit rules (not sure it makes sense,
# but to test more than one rule)
- #TODO(QoS): associate/disassociate policy with network
- #TODO(QoS): associate/disassociate policy with port