]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add utility function for checking trusted port
authorKevin Benton <blak111@gmail.com>
Thu, 3 Sep 2015 17:01:40 +0000 (10:01 -0700)
committerKevin Benton <kevinbenton@buttewifi.com>
Mon, 14 Sep 2015 10:41:54 +0000 (10:41 +0000)
Ports that have a device_owner that starts with 'network:'
are trusted in several places throughout the codebase. Each
of these did a startswith check on each field and it's not
immediately obvious why it's done.

This patch adds a utility function called 'is_port_trusted'
that performs the same check and makes it obvious what is
being done.

Change-Id: I542c753776d5cfb2fd736b25ea6e111867c89c89

neutron/api/rpc/handlers/securitygroups_rpc.py
neutron/common/utils.py
neutron/db/portsecurity_db.py
neutron/db/securitygroups_db.py
neutron/plugins/ml2/drivers/linuxbridge/agent/arp_protect.py
neutron/plugins/ml2/extensions/port_security.py

index c8115d0d096c5ba8e035700657084a5e1f2a6003..7401fcda6724d2e3e014844e93bce5c90e5fc527 100644 (file)
@@ -19,6 +19,7 @@ from oslo_log import log as logging
 from neutron.common import constants
 from neutron.common import rpc as n_rpc
 from neutron.common import topics
+from neutron.common import utils
 from neutron.i18n import _LW
 from neutron import manager
 
@@ -80,7 +81,7 @@ class SecurityGroupServerRpcCallback(object):
         return dict(
             (port['id'], port)
             for port in self.plugin.get_ports_from_devices(context, devices)
-            if port and not port['device_owner'].startswith('network:')
+            if port and not utils.is_port_trusted(port)
         )
 
     def security_group_rules_for_devices(self, context, **kwargs):
index a8b79e99fb5db26815a9eb8f01face21b8c62a0a..f44467efd281a2f75a978c3b03a71e0136944623 100644 (file)
@@ -432,6 +432,15 @@ def ip_version_from_int(ip_version_int):
     raise ValueError(_('Illegal IP version number'))
 
 
+def is_port_trusted(port):
+    """Used to determine if port can be trusted not to attack network.
+
+    Trust is currently based on the device_owner field starting with 'network:'
+    since we restrict who can use that in the default policy.json file.
+    """
+    return port['device_owner'].startswith('network:')
+
+
 class DelayedStringRenderer(object):
     """Takes a callable and its args and calls when __str__ is called
 
index cf78c80f444a5b4d6392584ad94b4167fdb413cf..343b5371050145812c4ccb4ad6bede464b533d37 100644 (file)
@@ -13,6 +13,7 @@
 #    under the License.
 
 from neutron.api.v2 import attributes as attrs
+from neutron.common import utils
 from neutron.db import db_base_plugin_v2
 from neutron.db import portsecurity_db_common
 from neutron.extensions import portsecurity as psec
@@ -40,8 +41,7 @@ class PortSecurityDbMixin(portsecurity_db_common.PortSecurityDbCommon):
         """
         has_ip = self._ip_on_port(port)
         # we don't apply security groups for dhcp, router
-        if (port.get('device_owner') and
-                port['device_owner'].startswith('network:')):
+        if port.get('device_owner') and utils.is_port_trusted(port):
             return (False, has_ip)
 
         if attrs.is_attr_set(port.get(psec.PORTSECURITY)):
index 7fe30c86142647eddc7768540900481137d5117b..7c589046990a6434fe90e93f0b5373e24f2453b6 100644 (file)
@@ -686,15 +686,15 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
 
         :returns: all security groups IDs on port belonging to tenant.
         """
-        p = port['port']
-        if not attributes.is_attr_set(p.get(ext_sg.SECURITYGROUPS)):
+        port = port['port']
+        if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
             return
-        if p.get('device_owner') and p['device_owner'].startswith('network:'):
+        if port.get('device_owner') and utils.is_port_trusted(port):
             return
 
-        port_sg = p.get(ext_sg.SECURITYGROUPS, [])
+        port_sg = port.get(ext_sg.SECURITYGROUPS, [])
         filters = {'id': port_sg}
-        tenant_id = p.get('tenant_id')
+        tenant_id = port.get('tenant_id')
         if tenant_id:
             filters['tenant_id'] = [tenant_id]
         valid_groups = set(g['id'] for g in
@@ -710,14 +710,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
 
     def _ensure_default_security_group_on_port(self, context, port):
         # we don't apply security groups for dhcp, router
-        if (port['port'].get('device_owner') and
-                port['port']['device_owner'].startswith('network:')):
+        port = port['port']
+        if port.get('device_owner') and utils.is_port_trusted(port):
             return
-        tenant_id = self._get_tenant_id_for_create(context,
-                                                   port['port'])
+        tenant_id = self._get_tenant_id_for_create(context, port)
         default_sg = self._ensure_default_security_group(context, tenant_id)
-        if not attributes.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)):
-            port['port'][ext_sg.SECURITYGROUPS] = [default_sg]
+        if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
+            port[ext_sg.SECURITYGROUPS] = [default_sg]
 
     def _check_update_deletes_security_groups(self, port):
         """Return True if port has as a security group and it's value
index 85be5888022ff1dcc91730408778300b506cf2ba..f648c1d93243a78ca0a35b2e4891f9624010d953 100644 (file)
@@ -18,6 +18,7 @@ from oslo_concurrency import lockutils
 from oslo_log import log as logging
 
 from neutron.agent.linux import ip_lib
+from neutron.common import utils
 from neutron.i18n import _LI
 
 LOG = logging.getLogger(__name__)
@@ -32,7 +33,7 @@ def setup_arp_spoofing_protection(vif, port_details):
         LOG.info(_LI("Skipping ARP spoofing rules for port '%s' because "
                      "it has port security disabled"), vif)
         return
-    if port_details['device_owner'].startswith('network:'):
+    if utils.is_port_trusted(port_details):
         # clear any previous entries related to this port
         delete_arp_spoofing_protection([vif], current_rules)
         LOG.debug("Skipping ARP spoofing rules for network owned port "
index cb582f3b28f91528f552fa29751fb994a9f3d144..6a10a41a617d0c790e010a8121aaf1ebbe186987 100644 (file)
@@ -14,6 +14,7 @@
 #    under the License.
 
 from neutron.api.v2 import attributes as attrs
+from neutron.common import utils
 from neutron.db import common_db_mixin
 from neutron.db import portsecurity_db_common as ps_db_common
 from neutron.extensions import portsecurity as psec
@@ -80,8 +81,7 @@ class PortSecurityExtensionDriver(api.ExtensionDriver,
         otherwise the value associated with the network is returned.
         """
         # we don't apply security groups for dhcp, router
-        if (port.get('device_owner') and
-                port['device_owner'].startswith('network:')):
+        if port.get('device_owner') and utils.is_port_trusted(port):
             return False
 
         if attrs.is_attr_set(port.get(psec.PORTSECURITY)):