'enabled': firewall_rule['enabled']}
return self._fields(res, fields)
- def _set_rules_for_policy(self, context, firewall_policy_db, rule_id_list):
+ def _set_rules_for_policy(self, context, firewall_policy_db, fwp):
+ rule_id_list = fwp['firewall_rules']
fwp_db = firewall_policy_db
with context.session.begin(subtransactions=True):
if not rule_id_list:
fwp_db['id']):
raise firewall.FirewallRuleInUse(
firewall_rule_id=fwrule_id)
+ if 'shared' in fwp:
+ if fwp['shared'] and not rules_dict[fwrule_id]['shared']:
+ raise firewall.FirewallRuleSharingConflict(
+ firewall_rule_id=fwrule_id,
+ firewall_policy_id=fwp_db['id'])
+ elif fwp_db['shared'] and not rules_dict[fwrule_id]['shared']:
+ raise firewall.FirewallRuleSharingConflict(
+ firewall_rule_id=fwrule_id,
+ firewall_policy_id=fwp_db['id'])
# New list of rules is valid so we will first reset the existing
# list and then add each rule in order.
# Note that the list could be empty in which case we interpret
fwp_db.firewall_rules.reorder()
fwp_db.audited = False
+ def _check_unshared_rules_for_policy(self, fwp_db, fwp):
+ if fwp['shared']:
+ rules_in_db = fwp_db['firewall_rules']
+ for fwr_db in rules_in_db:
+ if not fwr_db['shared']:
+ raise firewall.FirewallPolicySharingConflict(
+ firewall_rule_id=fwr_db['id'],
+ firewall_policy_id=fwp_db['id'])
+
def _process_rule_for_policy(self, context, firewall_policy_id,
firewall_rule_db, position):
with context.session.begin(subtransactions=True):
description=fwp['description'],
shared=fwp['shared'])
context.session.add(fwp_db)
- self._set_rules_for_policy(context, fwp_db,
- fwp['firewall_rules'])
+ self._set_rules_for_policy(context, fwp_db, fwp)
fwp_db.audited = fwp['audited']
return self._make_firewall_policy_dict(fwp_db)
fwp = firewall_policy['firewall_policy']
with context.session.begin(subtransactions=True):
fwp_db = self._get_firewall_policy(context, id)
- if 'firewall_rules' in fwp:
- self._set_rules_for_policy(context, fwp_db,
- fwp['firewall_rules'])
+ # check any existing rules are not shared
+ if 'shared' in fwp and 'firewall_rules' not in fwp:
+ self._check_unshared_rules_for_policy(fwp_db, fwp)
+ elif 'firewall_rules' in fwp:
+ self._set_rules_for_policy(context, fwp_db, fwp)
del fwp['firewall_rules']
if 'audited' not in fwp or fwp['audited']:
fwp['audited'] = False
message = _("Firewall Policy %(firewall_policy_id)s is being used.")
+class FirewallRuleSharingConflict(qexception.Conflict):
+
+ """FWaaS exception for firewall rules
+
+ When a shared policy is created or updated with unshared rules,
+ this exception will be raised.
+ """
+ message = _("Operation cannot be performed since Firewall Policy "
+ "%(firewall_policy_id)s is shared but Firewall Rule "
+ "%(firewall_rule_id)s is not shared")
+
+
+class FirewallPolicySharingConflict(qexception.Conflict):
+
+ """FWaaS exception for firewall policy
+
+ When a policy is shared without sharing its associated rules,
+ this exception will be raised.
+ """
+ message = _("Operation cannot be performed. Before sharing Firewall "
+ "Policy %(firewall_policy_id)s, share associated Firewall "
+ "Rule %(firewall_rule_id)s")
+
+
class FirewallRuleNotFound(qexception.NotFound):
message = _("Firewall Rule %(firewall_rule_id)s could not be found.")
audited=AUDITED)
self.assertEqual(res.status_int, 409)
+ def test_create_shared_firewall_policy_with_unshared_rule(self):
+ with self.firewall_rule(shared=False) as fwr:
+ fw_rule_ids = [fwr['firewall_rule']['id']]
+ res = self._create_firewall_policy(
+ None, 'firewall_policy1', description=DESCRIPTION, shared=True,
+ firewall_rules=fw_rule_ids, audited=AUDITED)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
def test_show_firewall_policy(self):
name = "firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_policy'][k], v)
+ def test_update_shared_firewall_policy_with_unshared_rule(self):
+ with self.firewall_rule(name='fwr1', shared=False) as fr:
+ with self.firewall_policy() as fwp:
+ fw_rule_ids = [fr['firewall_rule']['id']]
+ # update shared policy with unshared rule
+ data = {'firewall_policy':
+ {'firewall_rules': fw_rule_ids}}
+ req = self.new_update_request('firewall_policies', data,
+ fwp['firewall_policy']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
+ def test_update_firewall_policy_with_shared_attr_unshared_rule(self):
+ with self.firewall_rule(name='fwr1', shared=False) as fr:
+ with self.firewall_policy(shared=False) as fwp:
+ fw_rule_ids = [fr['firewall_rule']['id']]
+ # update shared policy with shared attr and unshared rule
+ data = {'firewall_policy': {'shared': True,
+ 'firewall_rules': fw_rule_ids}}
+ req = self.new_update_request('firewall_policies', data,
+ fwp['firewall_policy']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
+ def test_update_firewall_policy_with_shared_attr_exist_unshare_rule(self):
+ with self.firewall_rule(name='fwr1', shared=False) as fr:
+ fw_rule_ids = [fr['firewall_rule']['id']]
+ with self.firewall_policy(shared=False,
+ firewall_rules=fw_rule_ids) as fwp:
+ # update policy with shared attr
+ data = {'firewall_policy': {'shared': True}}
+ req = self.new_update_request('firewall_policies', data,
+ fwp['firewall_policy']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
def test_delete_firewall_policy(self):
ctx = context.get_admin_context()
with self.firewall_policy(do_delete=False) as fwp: