def delete_port(self, context, port):
self._process_reset_port(port)
+ def _policy_rules_modified(self, old_policy, policy):
+ return not (len(old_policy.rules) == len(policy.rules) and
+ all(i in old_policy.rules for i in policy.rules))
+
def _process_update_policy(self, qos_policy):
old_qos_policy = self.policy_map.get_policy(qos_policy.id)
- for port in self.policy_map.get_ports(qos_policy):
- #NOTE(QoS): for now, just reflush the rules on the port. Later, we
- # may want to apply the difference between the old and
- # new rule lists.
- self.qos_driver.delete(port, old_qos_policy)
- self.qos_driver.update(port, qos_policy)
+ if old_qos_policy:
+ if self._policy_rules_modified(old_qos_policy, qos_policy):
+ for port in self.policy_map.get_ports(qos_policy):
+ #NOTE(QoS): for now, just reflush the rules on the port.
+ # Later, we may want to apply the difference
+ # between the old and new rule lists.
+ self.qos_driver.delete(port, old_qos_policy)
+ self.qos_driver.update(port, qos_policy)
self.policy_map.update_policy(qos_policy)
def _process_reset_port(self, port):
from neutron.tests import base
-TEST_POLICY = policy.QosPolicy(context=None,
- name='test1', id='fake_policy_id')
+BASE_TEST_POLICY = {'context': None,
+ 'name': 'test1',
+ 'id': 'fake_policy_id'}
+
+TEST_POLICY = policy.QosPolicy(**BASE_TEST_POLICY)
+
+TEST_POLICY_DESCR = policy.QosPolicy(description='fake_descr',
+ **BASE_TEST_POLICY)
+
TEST_POLICY2 = policy.QosPolicy(context=None,
name='test2', id='fake_policy_id_2')
port2 = self._create_test_port_dict(qos_policy_id=TEST_POLICY2.id)
self.qos_ext.policy_map.set_port_policy(port1, TEST_POLICY)
self.qos_ext.policy_map.set_port_policy(port2, TEST_POLICY2)
+ self.qos_ext._policy_rules_modified = mock.Mock(return_value=True)
policy_obj = mock.Mock()
policy_obj.id = port1['qos_policy_id']
self.qos_ext._process_update_policy(policy_obj)
self.qos_ext.qos_driver.update.assert_called_with(port2, policy_obj)
+ def test__process_update_policy_descr_not_propagated_into_driver(self):
+ port = self._create_test_port_dict(qos_policy_id=TEST_POLICY.id)
+ self.qos_ext.policy_map.set_port_policy(port, TEST_POLICY)
+ self.qos_ext._policy_rules_modified = mock.Mock(return_value=False)
+ self.qos_ext._process_update_policy(TEST_POLICY_DESCR)
+ self.qos_ext._policy_rules_modified.assert_called_with(TEST_POLICY,
+ TEST_POLICY_DESCR)
+ self.assertFalse(self.qos_ext.qos_driver.delete.called)
+ self.assertFalse(self.qos_ext.qos_driver.update.called)
+ self.assertEqual(TEST_POLICY_DESCR,
+ self.qos_ext.policy_map.get_policy(TEST_POLICY.id))
+
+ def test__process_update_policy_not_known(self):
+ self.qos_ext._policy_rules_modified = mock.Mock()
+ self.qos_ext._process_update_policy(TEST_POLICY_DESCR)
+ self.assertFalse(self.qos_ext._policy_rules_modified.called)
+ self.assertFalse(self.qos_ext.qos_driver.delete.called)
+ self.assertFalse(self.qos_ext.qos_driver.update.called)
+ self.assertIsNone(self.qos_ext.policy_map.get_policy(
+ TEST_POLICY_DESCR.id))
+
def test__process_reset_port(self):
port1 = self._create_test_port_dict(qos_policy_id=TEST_POLICY.id)
port2 = self._create_test_port_dict(qos_policy_id=TEST_POLICY2.id)
subscribe_mock.assert_called_with(mock.ANY, resources.QOS_POLICY)
+class QosExtensionReflushRulesTestCase(QosExtensionBaseTestCase):
+
+ def setUp(self):
+ super(QosExtensionReflushRulesTestCase, self).setUp()
+ self.qos_ext.initialize(
+ self.connection, constants.EXTENSION_DRIVER_TYPE)
+
+ self.pull_mock = mock.patch.object(
+ self.qos_ext.resource_rpc, 'pull',
+ return_value=TEST_POLICY).start()
+
+ self.policy = policy.QosPolicy(**BASE_TEST_POLICY)
+ self.rule = (
+ rule.QosBandwidthLimitRule(context=None, id='fake_rule_id',
+ qos_policy_id=self.policy.id,
+ max_kbps=100, max_burst_kbps=10))
+ self.policy.rules = [self.rule]
+ self.port = {'port_id': uuidutils.generate_uuid(),
+ 'qos_policy_id': TEST_POLICY.id}
+ self.new_policy = policy.QosPolicy(description='descr',
+ **BASE_TEST_POLICY)
+
+ def test_is_reflush_required_change_policy_descr(self):
+ self.qos_ext.policy_map.set_port_policy(self.port, self.policy)
+ self.new_policy.rules = [self.rule]
+ self.assertFalse(self.qos_ext._policy_rules_modified(self.policy,
+ self.new_policy))
+
+ def test_is_reflush_required_change_policy_rule(self):
+ self.qos_ext.policy_map.set_port_policy(self.port, self.policy)
+ updated_rule = (rule.QosBandwidthLimitRule(context=None,
+ id='fake_rule_id',
+ qos_policy_id=self.policy.id,
+ max_kbps=200,
+ max_burst_kbps=20))
+ self.new_policy.rules = [updated_rule]
+ self.assertTrue(self.qos_ext._policy_rules_modified(self.policy,
+ self.new_policy))
+
+ def test_is_reflush_required_remove_rules(self):
+ self.qos_ext.policy_map.set_port_policy(self.port, self.policy)
+ self.new_policy.rules = []
+ self.assertTrue(self.qos_ext._policy_rules_modified(self.policy,
+ self.new_policy))
+
+ def test_is_reflush_required_add_rules(self):
+ self.qos_ext.policy_map.set_port_policy(self.port, self.policy)
+ self.new_policy.rules = [self.rule]
+ fake_rule = QosFakeRule(context=None, id='really_fake_rule_id',
+ qos_policy_id=self.policy.id)
+ self.new_policy.rules.append(fake_rule)
+ self.assertTrue(self.qos_ext._policy_rules_modified(self.policy,
+ self.new_policy))
+
+
class PortPolicyMapTestCase(base.BaseTestCase):
def setUp(self):