]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Validate CIDR given as ip-prefix in security-group-rule-create
authormarios <marios@redhat.com>
Fri, 29 Nov 2013 16:23:54 +0000 (18:23 +0200)
committerThomas Goirand <thomas@goirand.fr>
Mon, 9 Jun 2014 15:06:53 +0000 (23:06 +0800)
There was no validation for the provided ip prefix. This just adds
a simple parse using netaddr and explodes with appropriate message.
Also makes sure ip prefix _is_ cidr (192.168.1.1-->192.168.1.1/32).

Validation occurs at the attribute level (API model) as well as at
the db level, where the ethertype is validated against the ip_prefix
address type.

Unit test cases added - bad prefix, unmasked prefix and incorrect
ethertype. Also adds attribute test cases for the added
convert_ip_prefix_to_cidr method

Closes-Bug: 1255338

Conflicts:
neutron/tests/unit/test_security_groups_rpc.py

Change-Id: I71fb8c887963a122a5bd8cfdda800026c1cd3954
(cherry picked from commit 65aa92b0348b7ab8413f359b00825610cdf66607)

neutron/common/exceptions.py
neutron/db/securitygroups_db.py
neutron/extensions/securitygroup.py
neutron/tests/unit/test_extension_security_group.py

index bfd267ef46ed7f755386cb5302babd1adcf99bb2..e81b4af4e46ed5ad878a58f3dea0bdf308ec05ea 100644 (file)
@@ -319,3 +319,7 @@ class DuplicatedExtension(NeutronException):
 class DeviceIDNotOwnedByTenant(Conflict):
     message = _("The following device_id %(device_id)s is not owned by your "
                 "tenant or matches another tenants router.")
+
+
+class InvalidCIDR(BadRequest):
+    message = _("Invalid CIDR %(input)s given as IP prefix")
index 882a43d6256d4a128d5af35e9656da57e59e20bf..de464d6b1312bb61869b2202f47706ba437820b4 100644 (file)
@@ -12,6 +12,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import netaddr
 import sqlalchemy as sa
 from sqlalchemy import orm
 from sqlalchemy.orm import exc
@@ -327,6 +328,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
             new_rules.add(rule['security_group_id'])
 
             self._validate_port_range(rule)
+            self._validate_ip_prefix(rule)
 
             if rule['remote_ip_prefix'] and rule['remote_group_id']:
                 raise ext_sg.SecurityGroupRemoteGroupAndRemoteIpPrefix()
@@ -407,6 +409,24 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
                 if (i['security_group_rule'] == db_rule):
                     raise ext_sg.SecurityGroupRuleExists(id=id)
 
+    def _validate_ip_prefix(self, rule):
+        """Check that a valid cidr was specified as remote_ip_prefix
+
+        No need to check that it is in fact an IP address as this is already
+        validated by attribute validators.
+        Check that rule ethertype is consistent with remote_ip_prefix ip type.
+        Add mask to ip_prefix if absent (192.168.1.10 -> 192.168.1.10/32).
+        """
+        input_prefix = rule['remote_ip_prefix']
+        if input_prefix:
+            addr = netaddr.IPNetwork(input_prefix)
+            # set input_prefix to always include the netmask:
+            rule['remote_ip_prefix'] = str(addr)
+            # check consistency of ethertype with addr version
+            if rule['ethertype'] != "IPv%d" % (addr.version):
+                raise ext_sg.SecurityGroupRuleParameterConflict(
+                    ethertype=rule['ethertype'], cidr=input_prefix)
+
     def get_security_group_rules(self, context, filters=None, fields=None,
                                  sorts=None, limit=None, marker=None,
                                  page_reverse=False):
index ad2960facd3450360bd8522464703dbd41e4eec6..637dbe3d7eabc04379933d9b7ca480a3aa3867a2 100644 (file)
@@ -17,6 +17,7 @@
 
 from abc import ABCMeta
 from abc import abstractmethod
+import netaddr
 
 from oslo.config import cfg
 import six
@@ -103,6 +104,10 @@ class SecurityGroupRuleExists(qexception.InUse):
     message = _("Security group rule already exists. Group id is %(id)s.")
 
 
+class SecurityGroupRuleParameterConflict(qexception.InvalidInput):
+    message = _("Conflicting value ethertype %(ethertype)s for CIDR %(cidr)s")
+
+
 def convert_protocol(value):
     if value is None:
         return
@@ -153,6 +158,16 @@ def convert_to_uuid_list_or_none(value_list):
     return value_list
 
 
+def convert_ip_prefix_to_cidr(ip_prefix):
+    if not ip_prefix:
+        return
+    try:
+        cidr = netaddr.IPNetwork(ip_prefix)
+        return str(cidr)
+    except (TypeError, netaddr.AddrFormatError):
+        raise qexception.InvalidCIDR(input=ip_prefix)
+
+
 def _validate_name_not_default(data, valid_values=None):
     if data == "default":
         raise SecurityGroupDefaultAlreadyExists()
@@ -208,7 +223,8 @@ RESOURCE_ATTRIBUTE_MAP = {
                       'convert_to': convert_ethertype_to_case_insensitive,
                       'validate': {'type:values': sg_supported_ethertypes}},
         'remote_ip_prefix': {'allow_post': True, 'allow_put': False,
-                             'default': None, 'is_visible': True},
+                             'default': None, 'is_visible': True,
+                             'convert_to': convert_ip_prefix_to_cidr},
         'tenant_id': {'allow_post': True, 'allow_put': False,
                       'required_by_policy': True,
                       'is_visible': True},
index 1881d8c848436c088b5871a3f5b6cde11bc52291..6de9e2a8db2306e37965205483414459aa700b65 100644 (file)
@@ -21,10 +21,12 @@ import webob.exc
 
 from neutron.api.v2 import attributes as attr
 from neutron.common import constants as const
+from neutron.common import exceptions as n_exc
 from neutron import context
 from neutron.db import db_base_plugin_v2
 from neutron.db import securitygroups_db
 from neutron.extensions import securitygroup as ext_sg
+from neutron.tests import base
 from neutron.tests.unit import test_db_plugin
 
 DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_security_group.'
@@ -404,6 +406,70 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
             self.deserialize(self.fmt, res)
             self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
+    def test_create_security_group_rule_invalid_ip_prefix(self):
+        name = 'webservers'
+        description = 'my webservers'
+        for bad_prefix in ['bad_ip', 256, "2001:db8:a::123/129", '172.30./24']:
+            with self.security_group(name, description) as sg:
+                sg_id = sg['security_group']['id']
+                remote_ip_prefix = bad_prefix
+                rule = self._build_security_group_rule(
+                    sg_id,
+                    'ingress',
+                    const.PROTO_NAME_TCP,
+                    '22', '22',
+                    remote_ip_prefix)
+                res = self._create_security_group_rule(self.fmt, rule)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+
+    def test_create_security_group_rule_invalid_ethertype_for_prefix(self):
+        name = 'webservers'
+        description = 'my webservers'
+        test_addr = {'192.168.1.1/24': 'ipv4', '192.168.1.1/24': 'IPv6',
+                     '2001:db8:1234::/48': 'ipv6',
+                     '2001:db8:1234::/48': 'IPv4'}
+        for prefix, ether in test_addr.iteritems():
+            with self.security_group(name, description) as sg:
+                sg_id = sg['security_group']['id']
+                ethertype = ether
+                remote_ip_prefix = prefix
+                rule = self._build_security_group_rule(
+                    sg_id,
+                    'ingress',
+                    const.PROTO_NAME_TCP,
+                    '22', '22',
+                    remote_ip_prefix,
+                    None,
+                    None,
+                    ethertype)
+                res = self._create_security_group_rule(self.fmt, rule)
+                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+
+    def test_create_security_group_rule_with_unmasked_prefix(self):
+        name = 'webservers'
+        description = 'my webservers'
+        addr = {'10.1.2.3': {'mask': '32', 'ethertype': 'IPv4'},
+                'fe80::2677:3ff:fe7d:4c': {'mask': '128', 'ethertype': 'IPv6'}}
+        for ip in addr:
+            with self.security_group(name, description) as sg:
+                sg_id = sg['security_group']['id']
+                ethertype = addr[ip]['ethertype']
+                remote_ip_prefix = ip
+                rule = self._build_security_group_rule(
+                    sg_id,
+                    'ingress',
+                    const.PROTO_NAME_TCP,
+                    '22', '22',
+                    remote_ip_prefix,
+                    None,
+                    None,
+                    ethertype)
+                res = self._create_security_group_rule(self.fmt, rule)
+                self.assertEqual(res.status_int, 201)
+                res_sg = self.deserialize(self.fmt, res)
+                prefix = res_sg['security_group_rule']['remote_ip_prefix']
+                self.assertEqual(prefix, '%s/%s' % (ip, addr[ip]['mask']))
+
     def test_create_security_group_rule_tcp_protocol_as_number(self):
         name = 'webservers'
         description = 'my webservers'
@@ -1335,5 +1401,25 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
                 self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
 
 
+class TestConvertIPPrefixToCIDR(base.BaseTestCase):
+
+    def test_convert_bad_ip_prefix_to_cidr(self):
+        for val in ['bad_ip', 256, "2001:db8:a::123/129"]:
+            self.assertRaises(n_exc.InvalidCIDR,
+                              ext_sg.convert_ip_prefix_to_cidr, val)
+        self.assertIsNone(ext_sg.convert_ip_prefix_to_cidr(None))
+
+    def test_convert_ip_prefix_no_netmask_to_cidr(self):
+        addr = {'10.1.2.3': '32', 'fe80::2677:3ff:fe7d:4c': '128'}
+        for k, v in addr.iteritems():
+            self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(k),
+                             '%s/%s' % (k, v))
+
+    def test_convert_ip_prefix_with_netmask_to_cidr(self):
+        addresses = ['10.1.0.0/16', '10.1.2.3/32', '2001:db8:1234::/48']
+        for addr in addresses:
+            self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(addr), addr)
+
+
 class TestSecurityGroupsXML(TestSecurityGroups):
     fmt = 'xml'