]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix unable to add allow all IPv4/6 security group rule
authorAaron Rosen <arosen@nicira.com>
Thu, 21 Nov 2013 13:28:28 +0000 (05:28 -0800)
committerAaron Rosen <arosen@nicira.com>
Sun, 1 Dec 2013 18:44:39 +0000 (10:44 -0800)
Previously, if one tried to add a rule to allow all ingress ipv4 neutron
would respond that the rule was already part of the security group.
This happened as the filter for querying existing rules uses a wildcard for
remote_group_id thus returning a false match. This patch addresses
this issue.

Change-Id: I0320013a3869d25fb424995354721929465d2848
Closes-bug: #1252806

neutron/db/securitygroups_db.py
neutron/tests/unit/test_extension_security_group.py

index 5986190c5d466dda91806850b1cd3698d6bd5208..2a7d2ef035413211b7a329c171e08cad4e32d640 100644 (file)
@@ -395,9 +395,21 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
 
             # Check in database if rule exists
             filters = self._make_security_group_rule_filter_dict(i)
-            rules = self.get_security_group_rules(context, filters)
-            if rules:
-                raise ext_sg.SecurityGroupRuleExists(id=str(rules[0]['id']))
+            db_rules = self.get_security_group_rules(context, filters)
+            # Note(arosen): the call to get_security_group_rules wildcards
+            # values in the filter that have a value of [None]. For
+            # example, filters = {'remote_group_id': [None]} will return
+            # all security group rules regardless of their value of
+            # remote_group_id. Therefore it is not possible to do this
+            # query unless the behavior of _get_collection()
+            # is changed which cannot be because other methods are already
+            # relying on this behavor. Therefore, we do the filtering
+            # below to check for these corner cases.
+            for db_rule in db_rules:
+                # need to remove id from db_rule for matching
+                id = db_rule.pop('id')
+                if (i['security_group_rule'] == db_rule):
+                    raise ext_sg.SecurityGroupRuleExists(id=id)
 
     def get_security_group_rules(self, context, filters=None, fields=None,
                                  sorts=None, limit=None, marker=None,
index d77f200a8e4ff75c5a17cfa2a6ac558049843b2f..8028ac40731b8633c4bcc133edb47672cfd51226 100644 (file)
@@ -1175,6 +1175,37 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                 self.deserialize(self.fmt, res)
                 self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
 
+    def test_create_security_group_rule_allow_all_ipv4(self):
+        with self.security_group() as sg:
+            rule = {'security_group_id': sg['security_group']['id'],
+                    'direction': 'ingress',
+                    'ethertype': 'IPv4',
+                    'tenant_id': 'test_tenant'}
+
+            res = self._create_security_group_rule(
+                self.fmt, {'security_group_rule': rule})
+            rule = self.deserialize(self.fmt, res)
+            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+
+    def test_create_security_group_rule_allow_all_ipv4_v6_bulk(self):
+        if self._skip_native_bulk:
+            self.skipTest("Plugin does not support native bulk "
+                          "security_group_rule create")
+        with self.security_group() as sg:
+            rule_v4 = {'security_group_id': sg['security_group']['id'],
+                       'direction': 'ingress',
+                       'ethertype': 'IPv4',
+                       'tenant_id': 'test_tenant'}
+            rule_v6 = {'security_group_id': sg['security_group']['id'],
+                       'direction': 'ingress',
+                       'ethertype': 'IPv6',
+                       'tenant_id': 'test_tenant'}
+
+            rules = {'security_group_rules': [rule_v4, rule_v6]}
+            res = self._create_security_group_rule(self.fmt, rules)
+            self.deserialize(self.fmt, res)
+            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+
     def test_create_security_group_rule_duplicate_rule_in_post(self):
         if self._skip_native_bulk:
             self.skipTest("Plugin does not support native bulk "