raise firewall.FirewallRuleNotFound(firewall_rule_id=
fwrule_id)
elif rules_dict[fwrule_id]['firewall_policy_id']:
- raise firewall.FirewallRuleInUse(
- firewall_rule_id=fwrule_id)
+ if (rules_dict[fwrule_id]['firewall_policy_id'] !=
+ fwp_db['id']):
+ raise firewall.FirewallRuleInUse(
+ firewall_rule_id=fwrule_id)
# New list of rules is valid so we will first reset the existing
# list and then add each rule in order.
# Note that the list could be empty in which case we interpret
fwp_db.firewall_rules = []
for fwrule_id in rule_id_list:
fwp_db.firewall_rules.append(rules_dict[fwrule_id])
+ fwp_db.firewall_rules.reorder()
fwp_db.audited = False
def _process_rule_for_policy(self, context, firewall_policy_id,
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_policy'][k], v)
+ def test_update_firewall_policy_reorder_rules(self):
+ attrs = self._get_test_firewall_policy_attrs()
+
+ with self.firewall_policy() as fwp:
+ with contextlib.nested(self.firewall_rule(name='fwr1',
+ no_delete=True),
+ self.firewall_rule(name='fwr2',
+ no_delete=True),
+ self.firewall_rule(name='fwr3',
+ no_delete=True),
+ self.firewall_rule(name='fwr4',
+ no_delete=True)) as fr:
+ fw_rule_ids = [fr[2]['firewall_rule']['id'],
+ fr[3]['firewall_rule']['id']]
+ data = {'firewall_policy':
+ {'firewall_rules': fw_rule_ids}}
+ req = self.new_update_request('firewall_policies', data,
+ fwp['firewall_policy']['id'])
+ req.get_response(self.ext_api)
+ # shuffle the rules, add more rules
+ fw_rule_ids = [fr[1]['firewall_rule']['id'],
+ fr[3]['firewall_rule']['id'],
+ fr[2]['firewall_rule']['id'],
+ fr[0]['firewall_rule']['id']]
+ attrs['firewall_rules'] = fw_rule_ids
+ data = {'firewall_policy':
+ {'firewall_rules': fw_rule_ids}}
+ req = self.new_update_request('firewall_policies', data,
+ fwp['firewall_policy']['id'])
+ res = self.deserialize(self.fmt,
+ req.get_response(self.ext_api))
+ rules = []
+ for rule_id in fw_rule_ids:
+ req = self.new_show_request('firewall_rules',
+ rule_id,
+ fmt=self.fmt)
+ res = self.deserialize(self.fmt,
+ req.get_response(self.ext_api))
+ rules.append(res['firewall_rule'])
+ self.assertEqual(rules[0]['position'], 1)
+ self.assertEqual(rules[0]['id'], fr[1]['firewall_rule']['id'])
+ self.assertEqual(rules[1]['position'], 2)
+ self.assertEqual(rules[1]['id'], fr[3]['firewall_rule']['id'])
+ self.assertEqual(rules[2]['position'], 3)
+ self.assertEqual(rules[2]['id'], fr[2]['firewall_rule']['id'])
+ self.assertEqual(rules[3]['position'], 4)
+ self.assertEqual(rules[3]['id'], fr[0]['firewall_rule']['id'])
+
def test_update_firewall_policy_with_non_existing_rule(self):
attrs = self._get_test_firewall_policy_attrs()