]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
shared policy shouldn't have unshared rules
authorKoteswara Rao Kelam <koteswara.kelam@hp.com>
Thu, 21 Aug 2014 09:33:57 +0000 (02:33 -0700)
committerKoteswara Rao Kelam <koteswara.kelam@hp.com>
Thu, 4 Sep 2014 05:45:32 +0000 (22:45 -0700)
A shared firewall policy should always have shared rules. So the following
cases should not be allowed:
1.Create shared policy with unshared rules
2.Update policy shared=True when it has unshared rules
3.Update policy with shared=True and unshared rules
4.Update shared policy with unshared rules

Change-Id: I3d71899c328d3fefa96c1f99d6ba706160e445cc
Closes-bug: 1334981

neutron/db/firewall/firewall_db.py
neutron/extensions/firewall.py
neutron/tests/unit/db/firewall/test_db_firewall.py

index 1a1bafee68474ae1a98321e5af1b34726c4b4d53..8704c76a57680ddc3b5b5afdbcfcd118bec51af4 100644 (file)
@@ -164,7 +164,8 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
                'enabled': firewall_rule['enabled']}
         return self._fields(res, fields)
 
-    def _set_rules_for_policy(self, context, firewall_policy_db, rule_id_list):
+    def _set_rules_for_policy(self, context, firewall_policy_db, fwp):
+        rule_id_list = fwp['firewall_rules']
         fwp_db = firewall_policy_db
         with context.session.begin(subtransactions=True):
             if not rule_id_list:
@@ -188,6 +189,15 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
                             fwp_db['id']):
                         raise firewall.FirewallRuleInUse(
                             firewall_rule_id=fwrule_id)
+                if 'shared' in fwp:
+                    if fwp['shared'] and not rules_dict[fwrule_id]['shared']:
+                        raise firewall.FirewallRuleSharingConflict(
+                            firewall_rule_id=fwrule_id,
+                            firewall_policy_id=fwp_db['id'])
+                elif fwp_db['shared'] and not rules_dict[fwrule_id]['shared']:
+                    raise firewall.FirewallRuleSharingConflict(
+                        firewall_rule_id=fwrule_id,
+                        firewall_policy_id=fwp_db['id'])
             # New list of rules is valid so we will first reset the existing
             # list and then add each rule in order.
             # Note that the list could be empty in which case we interpret
@@ -198,6 +208,15 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
             fwp_db.firewall_rules.reorder()
             fwp_db.audited = False
 
+    def _check_unshared_rules_for_policy(self, fwp_db, fwp):
+        if fwp['shared']:
+            rules_in_db = fwp_db['firewall_rules']
+            for fwr_db in rules_in_db:
+                if not fwr_db['shared']:
+                    raise firewall.FirewallPolicySharingConflict(
+                        firewall_rule_id=fwr_db['id'],
+                        firewall_policy_id=fwp_db['id'])
+
     def _process_rule_for_policy(self, context, firewall_policy_id,
                                  firewall_rule_db, position):
         with context.session.begin(subtransactions=True):
@@ -304,8 +323,7 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
                                     description=fwp['description'],
                                     shared=fwp['shared'])
             context.session.add(fwp_db)
-            self._set_rules_for_policy(context, fwp_db,
-                                       fwp['firewall_rules'])
+            self._set_rules_for_policy(context, fwp_db, fwp)
             fwp_db.audited = fwp['audited']
         return self._make_firewall_policy_dict(fwp_db)
 
@@ -314,9 +332,11 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
         fwp = firewall_policy['firewall_policy']
         with context.session.begin(subtransactions=True):
             fwp_db = self._get_firewall_policy(context, id)
-            if 'firewall_rules' in fwp:
-                self._set_rules_for_policy(context, fwp_db,
-                                           fwp['firewall_rules'])
+            # check any existing rules are not shared
+            if 'shared' in fwp and 'firewall_rules' not in fwp:
+                self._check_unshared_rules_for_policy(fwp_db, fwp)
+            elif 'firewall_rules' in fwp:
+                self._set_rules_for_policy(context, fwp_db, fwp)
                 del fwp['firewall_rules']
             if 'audited' not in fwp or fwp['audited']:
                 fwp['audited'] = False
index ff0fd39fb1fbc9e3e034903a04bba9b3e6114846..ad930d01de2587f1f6c02829d259e34393cac810 100644 (file)
@@ -54,6 +54,30 @@ class FirewallPolicyInUse(qexception.InUse):
     message = _("Firewall Policy %(firewall_policy_id)s is being used.")
 
 
+class FirewallRuleSharingConflict(qexception.Conflict):
+
+    """FWaaS exception for firewall rules
+
+    When a shared policy is created or updated with unshared rules,
+    this exception will be raised.
+    """
+    message = _("Operation cannot be performed since Firewall Policy "
+                "%(firewall_policy_id)s is shared but Firewall Rule "
+                "%(firewall_rule_id)s is not shared")
+
+
+class FirewallPolicySharingConflict(qexception.Conflict):
+
+    """FWaaS exception for firewall policy
+
+    When a policy is shared without sharing its associated rules,
+    this exception will be raised.
+    """
+    message = _("Operation cannot be performed. Before sharing Firewall "
+                "Policy %(firewall_policy_id)s, share associated Firewall "
+                "Rule %(firewall_rule_id)s")
+
+
 class FirewallRuleNotFound(qexception.NotFound):
     message = _("Firewall Rule %(firewall_rule_id)s could not be found.")
 
index 5e27f5fc46c938e0815580b6c70db8250b04e214..a4d29450dc1e0a8dc9e4b05c6af10a2900514cd7 100644 (file)
@@ -347,6 +347,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
                     audited=AUDITED)
                 self.assertEqual(res.status_int, 409)
 
+    def test_create_shared_firewall_policy_with_unshared_rule(self):
+        with self.firewall_rule(shared=False) as fwr:
+            fw_rule_ids = [fwr['firewall_rule']['id']]
+            res = self._create_firewall_policy(
+                None, 'firewall_policy1', description=DESCRIPTION, shared=True,
+                firewall_rules=fw_rule_ids, audited=AUDITED)
+            self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
     def test_show_firewall_policy(self):
         name = "firewall_policy1"
         attrs = self._get_test_firewall_policy_attrs(name)
@@ -520,6 +528,42 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
                 for k, v in attrs.iteritems():
                     self.assertEqual(res['firewall_policy'][k], v)
 
+    def test_update_shared_firewall_policy_with_unshared_rule(self):
+        with self.firewall_rule(name='fwr1', shared=False) as fr:
+            with self.firewall_policy() as fwp:
+                fw_rule_ids = [fr['firewall_rule']['id']]
+                # update shared policy with unshared rule
+                data = {'firewall_policy':
+                        {'firewall_rules': fw_rule_ids}}
+                req = self.new_update_request('firewall_policies', data,
+                                              fwp['firewall_policy']['id'])
+                res = req.get_response(self.ext_api)
+                self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
+    def test_update_firewall_policy_with_shared_attr_unshared_rule(self):
+        with self.firewall_rule(name='fwr1', shared=False) as fr:
+            with self.firewall_policy(shared=False) as fwp:
+                fw_rule_ids = [fr['firewall_rule']['id']]
+                # update shared policy with shared attr and unshared rule
+                data = {'firewall_policy': {'shared': True,
+                                            'firewall_rules': fw_rule_ids}}
+                req = self.new_update_request('firewall_policies', data,
+                                              fwp['firewall_policy']['id'])
+                res = req.get_response(self.ext_api)
+                self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
+    def test_update_firewall_policy_with_shared_attr_exist_unshare_rule(self):
+        with self.firewall_rule(name='fwr1', shared=False) as fr:
+            fw_rule_ids = [fr['firewall_rule']['id']]
+            with self.firewall_policy(shared=False,
+                                      firewall_rules=fw_rule_ids) as fwp:
+                # update policy with shared attr
+                data = {'firewall_policy': {'shared': True}}
+                req = self.new_update_request('firewall_policies', data,
+                                              fwp['firewall_policy']['id'])
+                res = req.get_response(self.ext_api)
+                self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
     def test_delete_firewall_policy(self):
         ctx = context.get_admin_context()
         with self.firewall_policy(do_delete=False) as fwp: