from neutron.common import constants
from neutron.common import rpc as n_rpc
from neutron.common import topics
+from neutron.common import utils
from neutron.i18n import _LW
from neutron import manager
return dict(
(port['id'], port)
for port in self.plugin.get_ports_from_devices(context, devices)
- if port and not port['device_owner'].startswith('network:')
+ if port and not utils.is_port_trusted(port)
)
def security_group_rules_for_devices(self, context, **kwargs):
raise ValueError(_('Illegal IP version number'))
+def is_port_trusted(port):
+ """Used to determine if port can be trusted not to attack network.
+
+ Trust is currently based on the device_owner field starting with 'network:'
+ since we restrict who can use that in the default policy.json file.
+ """
+ return port['device_owner'].startswith('network:')
+
+
class DelayedStringRenderer(object):
"""Takes a callable and its args and calls when __str__ is called
# under the License.
from neutron.api.v2 import attributes as attrs
+from neutron.common import utils
from neutron.db import db_base_plugin_v2
from neutron.db import portsecurity_db_common
from neutron.extensions import portsecurity as psec
"""
has_ip = self._ip_on_port(port)
# we don't apply security groups for dhcp, router
- if (port.get('device_owner') and
- port['device_owner'].startswith('network:')):
+ if port.get('device_owner') and utils.is_port_trusted(port):
return (False, has_ip)
if attrs.is_attr_set(port.get(psec.PORTSECURITY)):
:returns: all security groups IDs on port belonging to tenant.
"""
- p = port['port']
- if not attributes.is_attr_set(p.get(ext_sg.SECURITYGROUPS)):
+ port = port['port']
+ if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
return
- if p.get('device_owner') and p['device_owner'].startswith('network:'):
+ if port.get('device_owner') and utils.is_port_trusted(port):
return
- port_sg = p.get(ext_sg.SECURITYGROUPS, [])
+ port_sg = port.get(ext_sg.SECURITYGROUPS, [])
filters = {'id': port_sg}
- tenant_id = p.get('tenant_id')
+ tenant_id = port.get('tenant_id')
if tenant_id:
filters['tenant_id'] = [tenant_id]
valid_groups = set(g['id'] for g in
def _ensure_default_security_group_on_port(self, context, port):
# we don't apply security groups for dhcp, router
- if (port['port'].get('device_owner') and
- port['port']['device_owner'].startswith('network:')):
+ port = port['port']
+ if port.get('device_owner') and utils.is_port_trusted(port):
return
- tenant_id = self._get_tenant_id_for_create(context,
- port['port'])
+ tenant_id = self._get_tenant_id_for_create(context, port)
default_sg = self._ensure_default_security_group(context, tenant_id)
- if not attributes.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)):
- port['port'][ext_sg.SECURITYGROUPS] = [default_sg]
+ if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
+ port[ext_sg.SECURITYGROUPS] = [default_sg]
def _check_update_deletes_security_groups(self, port):
"""Return True if port has as a security group and it's value
from oslo_log import log as logging
from neutron.agent.linux import ip_lib
+from neutron.common import utils
from neutron.i18n import _LI
LOG = logging.getLogger(__name__)
LOG.info(_LI("Skipping ARP spoofing rules for port '%s' because "
"it has port security disabled"), vif)
return
- if port_details['device_owner'].startswith('network:'):
+ if utils.is_port_trusted(port_details):
# clear any previous entries related to this port
delete_arp_spoofing_protection([vif], current_rules)
LOG.debug("Skipping ARP spoofing rules for network owned port "
# under the License.
from neutron.api.v2 import attributes as attrs
+from neutron.common import utils
from neutron.db import common_db_mixin
from neutron.db import portsecurity_db_common as ps_db_common
from neutron.extensions import portsecurity as psec
otherwise the value associated with the network is returned.
"""
# we don't apply security groups for dhcp, router
- if (port.get('device_owner') and
- port['device_owner'].startswith('network:')):
+ if port.get('device_owner') and utils.is_port_trusted(port):
return False
if attrs.is_attr_set(port.get(psec.PORTSECURITY)):