l2_extensions.wait_until_bandwidth_limit_rule_applied(
self.agent.int_br, port['vif_name'], rule)
+ def _create_port_with_qos(self):
+ port_dict = self._create_test_port_dict()
+ port_dict['qos_policy_id'] = TEST_POLICY_ID1
+ self.setup_agent_and_ports([port_dict])
+ self.wait_until_ports_state(self.ports, up=True)
+ self.wait_until_bandwidth_limit_rule_applied(port_dict,
+ TEST_BW_LIMIT_RULE_1)
+ return port_dict
+
class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
"""Test that qos_policy_id set to None will remove all qos rules from
given port.
"""
- port_dict = self._create_test_port_dict()
- port_dict['qos_policy_id'] = TEST_POLICY_ID1
- self.setup_agent_and_ports([port_dict])
- self.wait_until_ports_state(self.ports, up=True)
- self.wait_until_bandwidth_limit_rule_applied(port_dict,
- TEST_BW_LIMIT_RULE_1)
+ port_dict = self._create_port_with_qos()
port_dict['qos_policy_id'] = None
self.agent.port_update(None, port=port_dict)
"""Test that change of qos policy id on given port refreshes all its
rules.
"""
- port_dict = self._create_test_port_dict()
- port_dict['qos_policy_id'] = TEST_POLICY_ID1
- self.setup_agent_and_ports([port_dict])
- self.wait_until_ports_state(self.ports, up=True)
- self.wait_until_bandwidth_limit_rule_applied(port_dict,
- TEST_BW_LIMIT_RULE_1)
+ port_dict = self._create_port_with_qos()
port_dict['qos_policy_id'] = TEST_POLICY_ID2
self.agent.port_update(None, port=port_dict)
self.wait_until_bandwidth_limit_rule_applied(port_dict,
TEST_BW_LIMIT_RULE_2)
+
+ def test_policy_rule_delete(self):
+ port_dict = self._create_port_with_qos()
+
+ policy_copy = copy.deepcopy(self.qos_policies[TEST_POLICY_ID1])
+ policy_copy.rules = list()
+ consumer_reg.push(resources.QOS_POLICY, policy_copy, events.UPDATED)
+
+ self.wait_until_bandwidth_limit_rule_applied(port_dict, None)