]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Stop admin using other tenants unshared rules
authorPreeti Mirji <preeti.mirji@hp.com>
Wed, 23 Jul 2014 10:22:30 +0000 (03:22 -0700)
committerKoteswara Rao Kelam <koteswara.kelam@hp.com>
Tue, 23 Sep 2014 05:14:51 +0000 (22:14 -0700)
If the firewall rules are not shared and if they belong to different
tenants, then admin should not be able to create a policy using
these rules and he should not be able to insert such rules into
policies. An exception should be raised in such case. Added new
exception “FirewallRuleConflict” to handle such conditions.

Co-Authored-By: Koteswara Rao Kelam<koteswara.kelam@hp.com>
Change-Id: I984eb76069bd1493a77bf523bec2bd81abb14abb
Closes-bug: 1327057

neutron/db/firewall/firewall_db.py
neutron/extensions/firewall.py
neutron/tests/unit/db/firewall/test_db_firewall.py

index 2e7097d551737d3854f563a3be7d9e6be8e01185..7321d1d126fa559ed6c89e20590afd428f9b9971 100644 (file)
@@ -162,6 +162,13 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
                'enabled': firewall_rule['enabled']}
         return self._fields(res, fields)
 
+    def _check_firewall_rule_conflict(self, fwr_db, fwp_db):
+        if not fwr_db['shared']:
+            if fwr_db['tenant_id'] != fwp_db['tenant_id']:
+                raise firewall.FirewallRuleConflict(
+                    firewall_rule_id=fwr_db['id'],
+                    tenant_id=fwr_db['tenant_id'])
+
     def _set_rules_for_policy(self, context, firewall_policy_db, fwp):
         rule_id_list = fwp['firewall_rules']
         fwp_db = firewall_policy_db
@@ -196,6 +203,8 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
                     raise firewall.FirewallRuleSharingConflict(
                         firewall_rule_id=fwrule_id,
                         firewall_policy_id=fwp_db['id'])
+            for fwr_db in rules_in_db:
+                self._check_firewall_rule_conflict(fwr_db, fwp_db)
             # New list of rules is valid so we will first reset the existing
             # list and then add each rule in order.
             # Note that the list could be empty in which case we interpret
@@ -403,6 +412,13 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
     def update_firewall_rule(self, context, id, firewall_rule):
         LOG.debug(_("update_firewall_rule() called"))
         fwr = firewall_rule['firewall_rule']
+        fwr_db = self._get_firewall_rule(context, id)
+        if fwr_db.firewall_policy_id:
+            fwp_db = self._get_firewall_policy(context,
+                                               fwr_db.firewall_policy_id)
+            if 'shared' in fwr and not fwr['shared']:
+                if fwr_db['tenant_id'] != fwp_db['tenant_id']:
+                    raise firewall.FirewallRuleInUse(firewall_rule_id=id)
         if 'source_port' in fwr:
             src_port_min, src_port_max = self._get_min_max_ports_from_range(
                 fwr['source_port'])
@@ -416,7 +432,6 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
             fwr['destination_port_range_max'] = dst_port_max
             del fwr['destination_port']
         with context.session.begin(subtransactions=True):
-            fwr_db = self._get_firewall_rule(context, id)
             protocol = fwr.get('protocol', fwr_db['protocol'])
             if not protocol:
                 sport = fwr.get('source_port_range_min',
@@ -427,8 +442,6 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
                     raise firewall.FirewallRuleWithPortWithoutProtocolInvalid()
             fwr_db.update(fwr)
             if fwr_db.firewall_policy_id:
-                fwp_db = self._get_firewall_policy(context,
-                                                   fwr_db.firewall_policy_id)
                 fwp_db.audited = False
         return self._make_firewall_rule_dict(fwr_db)
 
@@ -476,8 +489,10 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
             insert_before = False
         with context.session.begin(subtransactions=True):
             fwr_db = self._get_firewall_rule(context, firewall_rule_id)
+            fwp_db = self._get_firewall_policy(context, id)
             if fwr_db.firewall_policy_id:
                 raise firewall.FirewallRuleInUse(firewall_rule_id=fwr_db['id'])
+            self._check_firewall_rule_conflict(fwr_db, fwp_db)
             if ref_firewall_rule_id:
                 # If reference_firewall_rule_id is set, the new rule
                 # is inserted depending on the value of insert_before.
index 89f8b0f0bf5b0b7d0cd5af71764c138f3b9f7b1b..3ae5bc7f38406dca0b275c4a8a85d86c9b887ecf 100644 (file)
@@ -127,6 +127,19 @@ class FirewallInternalDriverError(qexception.NeutronException):
     message = _("%(driver)s: Internal driver error.")
 
 
+class FirewallRuleConflict(qexception.Conflict):
+
+    """Firewall rule conflict exception.
+
+    Occurs when admin policy tries to use another tenant's unshared
+    rule.
+    """
+
+    message = _("Operation cannot be performed since Firewall Rule "
+                "%(firewall_rule_id)s is not shared and belongs to "
+                "another tenant %(tenant_id)s")
+
+
 fw_valid_protocol_values = [None, constants.TCP, constants.UDP, constants.ICMP]
 fw_valid_action_values = [constants.FWAAS_ALLOW, constants.FWAAS_DENY]
 
index 0fc3c2c61ea8184b015de9564d061211e5515257..0dd6f42aac0a95d81707cf2c782acbb0f4cdeddd 100644 (file)
@@ -335,6 +335,17 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
                 for k, v in attrs.iteritems():
                     self.assertEqual(fwp['firewall_policy'][k], v)
 
+    def test_create_admin_firewall_policy_with_other_tenant_rules(self):
+        with self.firewall_rule(shared=False) as fr:
+            fw_rule_ids = [fr['firewall_rule']['id']]
+            res = self._create_firewall_policy(None, 'firewall_policy1',
+                                               description=DESCRIPTION,
+                                               shared=SHARED,
+                                               firewall_rules=fw_rule_ids,
+                                               audited=AUDITED,
+                                               tenant_id='admin-tenant')
+            self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
     def test_create_firewall_policy_with_previously_associated_rule(self):
         with self.firewall_rule() as fwr:
             fw_rule_ids = [fwr['firewall_rule']['id']]
@@ -834,6 +845,17 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
                                  [fwr_id])
                 self.assertEqual(res['firewall_policy']['audited'], False)
 
+    def test_update_firewall_rule_associated_with_other_tenant_policy(self):
+        with self.firewall_rule(shared=True, tenant_id='tenant1') as fwr:
+            fwr_id = [fwr['firewall_rule']['id']]
+            with self.firewall_policy(shared=False,
+                                      firewall_rules=fwr_id):
+                data = {'firewall_rule': {'shared': False}}
+                req = self.new_update_request('firewall_rules', data,
+                                              fwr['firewall_rule']['id'])
+                res = req.get_response(self.ext_api)
+                self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
     def test_delete_firewall_rule(self):
         ctx = context.get_admin_context()
         with self.firewall_rule(do_delete=False) as fwr:
@@ -1057,6 +1079,18 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
                     expected_code=webob.exc.HTTPBadRequest.code,
                     expected_body=None)
 
+    def test_insert_rule_for_policy_of_other_tenant(self):
+        with self.firewall_rule(tenant_id='tenant-2', shared=False) as fwr:
+            fwr_id = fwr['firewall_rule']['id']
+            with self.firewall_policy(name='firewall_policy') as fwp:
+                fwp_id = fwp['firewall_policy']['id']
+                insert_data = {'firewall_rule_id': fwr_id}
+                self._rule_action(
+                    'insert', fwp_id, fwr_id, insert_before=None,
+                    insert_after=None,
+                    expected_code=webob.exc.HTTPConflict.code,
+                    expected_body=None, body_data=insert_data)
+
     def test_insert_rule_in_policy(self):
         attrs = self._get_test_firewall_policy_attrs()
         attrs['audited'] = False