From: Kevin Benton Date: Thu, 3 Sep 2015 17:01:40 +0000 (-0700) Subject: Add utility function for checking trusted port X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=638d16c8a019cfdafa2b6bb12c95775544bb58df;p=openstack-build%2Fneutron-build.git Add utility function for checking trusted port Ports that have a device_owner that starts with 'network:' are trusted in several places throughout the codebase. Each of these did a startswith check on each field and it's not immediately obvious why it's done. This patch adds a utility function called 'is_port_trusted' that performs the same check and makes it obvious what is being done. Change-Id: I542c753776d5cfb2fd736b25ea6e111867c89c89 --- diff --git a/neutron/api/rpc/handlers/securitygroups_rpc.py b/neutron/api/rpc/handlers/securitygroups_rpc.py index c8115d0d0..7401fcda6 100644 --- a/neutron/api/rpc/handlers/securitygroups_rpc.py +++ b/neutron/api/rpc/handlers/securitygroups_rpc.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from neutron.common import constants from neutron.common import rpc as n_rpc from neutron.common import topics +from neutron.common import utils from neutron.i18n import _LW from neutron import manager @@ -80,7 +81,7 @@ class SecurityGroupServerRpcCallback(object): return dict( (port['id'], port) for port in self.plugin.get_ports_from_devices(context, devices) - if port and not port['device_owner'].startswith('network:') + if port and not utils.is_port_trusted(port) ) def security_group_rules_for_devices(self, context, **kwargs): diff --git a/neutron/common/utils.py b/neutron/common/utils.py index a8b79e99f..f44467efd 100644 --- a/neutron/common/utils.py +++ b/neutron/common/utils.py @@ -432,6 +432,15 @@ def ip_version_from_int(ip_version_int): raise ValueError(_('Illegal IP version number')) +def is_port_trusted(port): + """Used to determine if port can be trusted not to attack network. + + Trust is currently based on the device_owner field starting with 'network:' + since we restrict who can use that in the default policy.json file. + """ + return port['device_owner'].startswith('network:') + + class DelayedStringRenderer(object): """Takes a callable and its args and calls when __str__ is called diff --git a/neutron/db/portsecurity_db.py b/neutron/db/portsecurity_db.py index cf78c80f4..343b53710 100644 --- a/neutron/db/portsecurity_db.py +++ b/neutron/db/portsecurity_db.py @@ -13,6 +13,7 @@ # under the License. from neutron.api.v2 import attributes as attrs +from neutron.common import utils from neutron.db import db_base_plugin_v2 from neutron.db import portsecurity_db_common from neutron.extensions import portsecurity as psec @@ -40,8 +41,7 @@ class PortSecurityDbMixin(portsecurity_db_common.PortSecurityDbCommon): """ has_ip = self._ip_on_port(port) # we don't apply security groups for dhcp, router - if (port.get('device_owner') and - port['device_owner'].startswith('network:')): + if port.get('device_owner') and utils.is_port_trusted(port): return (False, has_ip) if attrs.is_attr_set(port.get(psec.PORTSECURITY)): diff --git a/neutron/db/securitygroups_db.py b/neutron/db/securitygroups_db.py index 7fe30c861..7c5890469 100644 --- a/neutron/db/securitygroups_db.py +++ b/neutron/db/securitygroups_db.py @@ -686,15 +686,15 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): :returns: all security groups IDs on port belonging to tenant. """ - p = port['port'] - if not attributes.is_attr_set(p.get(ext_sg.SECURITYGROUPS)): + port = port['port'] + if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)): return - if p.get('device_owner') and p['device_owner'].startswith('network:'): + if port.get('device_owner') and utils.is_port_trusted(port): return - port_sg = p.get(ext_sg.SECURITYGROUPS, []) + port_sg = port.get(ext_sg.SECURITYGROUPS, []) filters = {'id': port_sg} - tenant_id = p.get('tenant_id') + tenant_id = port.get('tenant_id') if tenant_id: filters['tenant_id'] = [tenant_id] valid_groups = set(g['id'] for g in @@ -710,14 +710,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): def _ensure_default_security_group_on_port(self, context, port): # we don't apply security groups for dhcp, router - if (port['port'].get('device_owner') and - port['port']['device_owner'].startswith('network:')): + port = port['port'] + if port.get('device_owner') and utils.is_port_trusted(port): return - tenant_id = self._get_tenant_id_for_create(context, - port['port']) + tenant_id = self._get_tenant_id_for_create(context, port) default_sg = self._ensure_default_security_group(context, tenant_id) - if not attributes.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)): - port['port'][ext_sg.SECURITYGROUPS] = [default_sg] + if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)): + port[ext_sg.SECURITYGROUPS] = [default_sg] def _check_update_deletes_security_groups(self, port): """Return True if port has as a security group and it's value diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/arp_protect.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/arp_protect.py index 85be58880..f648c1d93 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/arp_protect.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/arp_protect.py @@ -18,6 +18,7 @@ from oslo_concurrency import lockutils from oslo_log import log as logging from neutron.agent.linux import ip_lib +from neutron.common import utils from neutron.i18n import _LI LOG = logging.getLogger(__name__) @@ -32,7 +33,7 @@ def setup_arp_spoofing_protection(vif, port_details): LOG.info(_LI("Skipping ARP spoofing rules for port '%s' because " "it has port security disabled"), vif) return - if port_details['device_owner'].startswith('network:'): + if utils.is_port_trusted(port_details): # clear any previous entries related to this port delete_arp_spoofing_protection([vif], current_rules) LOG.debug("Skipping ARP spoofing rules for network owned port " diff --git a/neutron/plugins/ml2/extensions/port_security.py b/neutron/plugins/ml2/extensions/port_security.py index cb582f3b2..6a10a41a6 100644 --- a/neutron/plugins/ml2/extensions/port_security.py +++ b/neutron/plugins/ml2/extensions/port_security.py @@ -14,6 +14,7 @@ # under the License. from neutron.api.v2 import attributes as attrs +from neutron.common import utils from neutron.db import common_db_mixin from neutron.db import portsecurity_db_common as ps_db_common from neutron.extensions import portsecurity as psec @@ -80,8 +81,7 @@ class PortSecurityExtensionDriver(api.ExtensionDriver, otherwise the value associated with the network is returned. """ # we don't apply security groups for dhcp, router - if (port.get('device_owner') and - port['device_owner'].startswith('network:')): + if port.get('device_owner') and utils.is_port_trusted(port): return False if attrs.is_attr_set(port.get(psec.PORTSECURITY)):