# Each service definition should be in the following format:
# <service>:<plugin>[:driver]
-[SECURITYGROUP]
-# If set to true this allows quantum to receive proxied security group calls from nova
-# proxy_mode = False
-
[AGENT]
# Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real
# root filter facility.
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('description', sa.String(length=255), nullable=True),
- sa.Column('external_id', sa.Integer(), nullable=True),
- sa.PrimaryKeyConstraint('id'),
- sa.UniqueConstraint('external_id')
+ sa.PrimaryKeyConstraint('id')
)
op.create_table(
'securitygrouprules',
sa.Column('tenant_id', sa.String(length=255), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
- sa.Column('external_id', sa.Integer(), nullable=True),
sa.Column('security_group_id', sa.String(length=36), nullable=False),
sa.Column('source_group_id', sa.String(length=36), nullable=True),
sa.Column('direction',
#
# @author: Aaron Rosen, Nicira, Inc
-from oslo.config import cfg
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
"""Represents a v2 quantum security group."""
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
- external_id = sa.Column(sa.Integer, unique=True)
class SecurityGroupPortBinding(model_base.BASEV2):
class SecurityGroupRule(model_base.BASEV2, models_v2.HasId,
models_v2.HasTenant):
"""Represents a v2 quantum security group rule."""
- external_id = sa.Column(sa.Integer)
security_group_id = sa.Column(sa.String(36),
sa.ForeignKey("securitygroups.id",
ondelete="CASCADE"),
a given tenant if it does not exist.
"""
s = security_group['security_group']
- if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
- raise ext_sg.SecurityGroupProxyModeNotAdmin()
- if (cfg.CONF.SECURITYGROUP.proxy_mode and not s.get('external_id')):
- raise ext_sg.SecurityGroupProxyMode()
- if not cfg.CONF.SECURITYGROUP.proxy_mode and s.get('external_id'):
- raise ext_sg.SecurityGroupNotProxyMode()
-
tenant_id = self._get_tenant_id_for_create(context, s)
- # if in proxy mode a default security group will be created by source
- if not default_sg and not cfg.CONF.SECURITYGROUP.proxy_mode:
- self._ensure_default_security_group(context, tenant_id,
- security_group)
- if s.get('external_id'):
- try:
- # Check if security group already exists
- sg = self.get_security_group(context, s.get('external_id'))
- if sg:
- raise ext_sg.SecurityGroupAlreadyExists(
- name=sg.get('name', ''),
- external_id=s.get('external_id'))
- except ext_sg.SecurityGroupNotFound:
- pass
+ if not default_sg:
+ self._ensure_default_security_group(context, tenant_id)
with context.session.begin(subtransactions=True):
security_group_db = SecurityGroup(id=s.get('id') or (
uuidutils.generate_uuid()),
description=s['description'],
tenant_id=tenant_id,
- name=s['name'],
- external_id=s.get('external_id'))
+ name=s['name'])
context.session.add(security_group_db)
if s.get('name') == 'default':
for ethertype in ext_sg.sg_supported_ethertypes:
def _get_security_group(self, context, id):
try:
query = self._model_query(context, SecurityGroup)
- if uuidutils.is_uuid_like(id):
- sg = query.filter(SecurityGroup.id == id).one()
- else:
- sg = query.filter(SecurityGroup.external_id == id).one()
+ sg = query.filter(SecurityGroup.id == id).one()
except exc.NoResultFound:
raise ext_sg.SecurityGroupNotFound(id=id)
return sg
def delete_security_group(self, context, id):
- if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
- raise ext_sg.SecurityGroupProxyModeNotAdmin()
-
filters = {'security_group_id': [id]}
ports = self._get_port_security_group_bindings(context, filters)
if ports:
'name': security_group['name'],
'tenant_id': security_group['tenant_id'],
'description': security_group['description']}
- if security_group.get('external_id'):
- res['external_id'] = security_group['external_id']
res['security_group_rules'] = [self._make_security_group_rule_dict(r)
for r in security_group.rules]
return self._fields(res, fields)
id=uuidutils.generate_uuid(), tenant_id=tenant_id,
security_group_id=rule['security_group_id'],
direction=rule['direction'],
- external_id=rule.get('external_id'),
source_group_id=rule.get('source_group_id'),
ethertype=rule['ethertype'],
protocol=rule['protocol'],
group, source_group_id/security_group_id belong to the same tenant,
and rules are valid.
"""
-
- if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
- raise ext_sg.SecurityGroupProxyModeNotAdmin()
-
new_rules = set()
tenant_ids = set()
for rules in security_group_rule['security_group_rules']:
rule = rules.get('security_group_rule')
new_rules.add(rule['security_group_id'])
- if (cfg.CONF.SECURITYGROUP.proxy_mode and
- not rule.get('external_id')):
- raise ext_sg.SecurityGroupProxyMode()
- if (not cfg.CONF.SECURITYGROUP.proxy_mode and
- rule.get('external_id')):
- raise ext_sg.SecurityGroupNotProxyMode()
-
# Check that port_range's are valid
if (rule['port_range_min'] is None and
rule['port_range_max'] is None):
'port_range_min': security_group_rule['port_range_min'],
'port_range_max': security_group_rule['port_range_max'],
'source_ip_prefix': security_group_rule['source_ip_prefix'],
- 'source_group_id': security_group_rule['source_group_id'],
- 'external_id': security_group_rule['external_id']}
+ 'source_group_id': security_group_rule['source_group_id']}
return self._fields(res, fields)
include_if_present = ['protocol', 'port_range_max', 'port_range_min',
'ethertype', 'source_ip_prefix',
- 'source_group_id', 'external_id']
+ 'source_group_id']
for key in include_if_present:
value = sgr.get(key)
if value:
def _get_security_group_rule(self, context, id):
try:
- if uuidutils.is_uuid_like(id):
- query = self._model_query(context, SecurityGroupRule)
- sgr = query.filter(SecurityGroupRule.id == id).one()
- else:
- query = self._model_query(context, SecurityGroupRule)
- sgr = query.filter(SecurityGroupRule.external_id == id).one()
+ query = self._model_query(context, SecurityGroupRule)
+ sgr = query.filter(SecurityGroupRule.id == id).one()
except exc.NoResultFound:
raise ext_sg.SecurityGroupRuleNotFound(id=id)
return sgr
def delete_security_group_rule(self, context, id):
- if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
- raise ext_sg.SecurityGroupProxyModeNotAdmin()
with context.session.begin(subtransactions=True):
rule = self._get_security_group_rule(context, id)
context.session.delete(rule)
self._create_port_security_group_binding(context, port_id,
security_group_id)
- def _ensure_default_security_group(self, context, tenant_id,
- security_group=None):
+ def _ensure_default_security_group(self, context, tenant_id):
"""Create a default security group if one doesn't exist.
:returns: the default security group id.
"""
- # if in proxy mode a default security group will be created by source
- if not security_group and cfg.CONF.SECURITYGROUP.proxy_mode:
- return
-
filters = {'name': ['default'], 'tenant_id': [tenant_id]}
default_group = self.get_security_groups(context, filters)
if not default_group:
security_group = {'security_group': {'name': 'default',
'tenant_id': tenant_id,
'description': 'default'}}
- if security_group:
- security_group['security_group']['external_id'] = (
- security_group['security_group'].get('external_id'))
ret = self.create_security_group(context, security_group, True)
return ret['id']
else:
if p.get('device_owner') and p['device_owner'].startswith('network:'):
return
- valid_groups = self.get_security_groups(
- context, fields=['external_id', 'id'])
+ valid_groups = self.get_security_groups(context, fields=['id'])
valid_group_map = dict((g['id'], g['id']) for g in valid_groups)
- valid_group_map.update((g['external_id'], g['id'])
- for g in valid_groups if g.get('external_id'))
try:
return set([valid_group_map[sg_id]
for sg_id in p.get(ext_sg.SECURITYGROUPS, [])])
raise ext_sg.SecurityGroupNotFound(id=str(e))
def _ensure_default_security_group_on_port(self, context, port):
- # return if proxy_mode is enabled since nova will handle adding
- # the port to the default security group.
- if cfg.CONF.SECURITYGROUP.proxy_mode:
- return
# we don't apply security groups for dhcp, router
if (port['port'].get('device_owner') and
port['port']['device_owner'].startswith('network:')):
# Security group Exceptions
-class SecurityGroupAlreadyExists(qexception.InUse):
- # This can only happen if the external_id database is cleared
- message = _("Security group %(name)s id %(external_id)s already exists")
-
-
class SecurityGroupInvalidPortRange(qexception.InvalidInput):
message = _("For TCP/UDP protocols, port_range_min must be "
"<= port_range_max")
message = _("Security group rule already exists. Group id is %(id)s.")
-class SecurityGroupProxyMode(qexception.InUse):
- message = _("Did not recieve external id and in proxy mode")
-
-
-class SecurityGroupNotProxyMode(qexception.InUse):
- message = _("Recieve external id and not in proxy mode")
-
-
-class SecurityGroupProxyModeNotAdmin(qexception.NotAuthorized):
- message = _("In Proxy Mode and not from admin")
-
-
-class SecurityGroupInvalidExternalID(qexception.InvalidInput):
- message = _("external_id wrong type %(data)s")
-
-
def convert_protocol_to_case_insensitive(value):
if value is None:
return value
raise SecurityGroupInvalidPortValue(port=port)
-def convert_to_uuid_or_int_list(value_list):
+def convert_to_uuid_list_or_none(value_list):
if value_list is None:
return
- try:
- return [sg_id if uuidutils.is_uuid_like(sg_id) else int(sg_id)
- for sg_id in value_list]
- except (ValueError, TypeError):
- msg = _("'%s' is not an integer or uuid") % sg_id
- raise qexception.InvalidInput(error_message=msg)
+ for sg_id in value_list:
+ if not uuidutils.is_uuid_like(sg_id):
+ msg = _("'%s' is not an integer or uuid") % sg_id
+ raise qexception.InvalidInput(error_message=msg)
+ return value_list
def _validate_name_not_default(data, valid_values=None):
- if not cfg.CONF.SECURITYGROUP.proxy_mode and data == "default":
+ if data == "default":
raise SecurityGroupDefaultAlreadyExists()
-def _validate_external_id_and_mode(external_id, valid_values=None):
- if not cfg.CONF.SECURITYGROUP.proxy_mode and not external_id:
- return
- elif not cfg.CONF.SECURITYGROUP.proxy_mode and external_id:
- raise SecurityGroupNotProxyMode()
- try:
- int(external_id)
- except (ValueError, TypeError):
- raise SecurityGroupInvalidExternalID(data=external_id)
- if cfg.CONF.SECURITYGROUP.proxy_mode and not external_id:
- raise SecurityGroupProxyMode()
-
attr.validators['type:name_not_default'] = _validate_name_not_default
-attr.validators['type:external_id_and_mode'] = _validate_external_id_and_mode
sg_supported_protocols = [None, 'tcp', 'udp', 'icmp']
sg_supported_ethertypes = ['IPv4', 'IPv6']
'validate': {'type:name_not_default': None}},
'description': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': ''},
- 'external_id': {'allow_post': True, 'allow_put': False,
- 'is_visible': True, 'default': None,
- 'validate': {'type:external_id_and_mode': None}},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
'validate': {'type:uuid': None},
'is_visible': True,
'primary_key': True},
- # external_id can be used to be backwards compatible with nova
- 'external_id': {'allow_post': True, 'allow_put': False,
- 'is_visible': True, 'default': None,
- 'validate': {'type:external_id_and_mode': None}},
'security_group_id': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'required_by_policy': True},
'source_group_id': {'allow_post': True, 'allow_put': False,
'ports': {SECURITYGROUPS: {'allow_post': True,
'allow_put': True,
'is_visible': True,
- 'convert_to': convert_to_uuid_or_int_list,
+ 'convert_to': convert_to_uuid_list_or_none,
'default': attr.ATTR_NOT_SPECIFIED}}}
security_group_quota_opts = [
cfg.IntOpt('quota_security_group',
]
cfg.CONF.register_opts(security_group_quota_opts, 'QUOTAS')
-security_group_opts = [
- cfg.StrOpt('proxy_mode', default=False)
-]
-cfg.CONF.register_opts(security_group_opts, 'SECURITYGROUP')
-
class Securitygroup(extensions.ExtensionDescriptor):
""" Security group extension"""
group and we don't need to check if one exists.
"""
s = security_group.get('security_group')
- if cfg.CONF.SECURITYGROUP.proxy_mode:
- if not context.is_admin:
- raise ext_sg.SecurityGroupProxyModeNotAdmin()
- elif not s.get('external_id'):
- raise ext_sg.SecurityGroupProxyMode()
- elif s.get('external_id'):
- raise ext_sg.SecurityGroupNotProxyMode()
tenant_id = self._get_tenant_id_for_create(context, s)
- if not default_sg and not cfg.CONF.SECURITYGROUP.proxy_mode:
- self._ensure_default_security_group(context, tenant_id,
- security_group)
- if s.get('external_id'):
- filters = {'external_id': [s.get('external_id')]}
- security_groups = super(NvpPluginV2, self).get_security_groups(
- context, filters=filters)
- if security_groups:
- raise ext_sg.SecurityGroupAlreadyExists(
- name=s.get('name', ''), external_id=s.get('external_id'))
+ if not default_sg:
+ self._ensure_default_security_group(context, tenant_id)
+
nvp_secgroup = nvplib.create_security_profile(self.default_cluster,
tenant_id, s)
security_group['security_group']['id'] = nvp_secgroup['uuid']
"""Delete a security group
:param security_group_id: security group rule to remove.
"""
- if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
- raise ext_sg.SecurityGroupProxyModeNotAdmin()
-
with context.session.begin(subtransactions=True):
security_group = super(NvpPluginV2, self).get_security_group(
context, security_group_id)
security_group_id = self._validate_security_group_rules(
context, security_group_rule)
- # Check to make sure security group exists and retrieve
- # security_group['id'] needed incase it only has an external_id
+ # Check to make sure security group exists
security_group = super(NvpPluginV2, self).get_security_group(
context, security_group_id)
""" Delete a security group rule
:param sgrid: security group id to remove.
"""
- if (cfg.CONF.SECURITYGROUP.proxy_mode and not context.is_admin):
- raise ext_sg.SecurityGroupProxyModeNotAdmin()
-
with context.session.begin(subtransactions=True):
# determine security profile id
security_group_rule = (
def _get_security_group_rules_nvp_format(self, context, security_group_id,
with_id=False):
- """Query quantum db for security group rules. If external_id is
- provided the external_id will also be returned.
+ """Query quantum db for security group rules.
"""
fields = ['source_ip_prefix', 'source_group_id', 'protocol',
'port_range_min', 'port_range_max', 'protocol', 'ethertype']
return new_taglist
-def set_ext_security_profile_id_tag(external_id, taglist=None):
- """Convenience function to add spid tag to taglist.
-
- :param external_id: the security_profile id from nova
- :param taglist: the taglist to append to (or None).
- :returns: a new taglist that includes the old taglist with the new
- spid tag set."""
- new_taglist = []
- if taglist:
- new_taglist = [x for x in taglist if x['scope'] !=
- EXT_SECURITY_PROFILE_ID_SCOPE]
- if external_id:
- new_taglist.append(dict(scope=EXT_SECURITY_PROFILE_ID_SCOPE,
- tag=str(external_id)))
- return new_taglist
-
-
# -----------------------------------------------------------------------------
# Security Group API Calls
# -----------------------------------------------------------------------------
def create_security_profile(cluster, tenant_id, security_profile):
path = "/ws.v1/security-profile"
tags = set_tenant_id_tag(tenant_id)
- tags = set_ext_security_profile_id_tag(
- security_profile.get('external_id'), tags)
# Allow all dhcp responses in
dhcp = {'logical_port_egress_rules': [{'ethertype': 'IPv4',
'protocol': 17,
class SecurityGroupsTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
- def _create_security_group(self, fmt, name, description, external_id=None,
- **kwargs):
+ def _create_security_group(self, fmt, name, description, **kwargs):
data = {'security_group': {'name': name,
'tenant_id': kwargs.get('tenant_id',
'test_tenant'),
'description': description}}
- if external_id:
- data['security_group']['external_id'] = external_id
security_group_req = self.new_create_request('security-groups', data,
fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
def _build_security_group_rule(self, security_group_id, direction,
protocol, port_range_min, port_range_max,
source_ip_prefix=None, source_group_id=None,
- external_id=None, tenant_id='test_tenant',
+ tenant_id='test_tenant',
ethertype='IPv4'):
data = {'security_group_rule': {'security_group_id': security_group_id,
'port_range_max': port_range_max,
'tenant_id': tenant_id,
'ethertype': ethertype}}
- if external_id:
- data['security_group_rule']['external_id'] = external_id
-
if source_ip_prefix:
data['security_group_rule']['source_ip_prefix'] = source_ip_prefix
context.Context('', kwargs['tenant_id']))
return security_group_rule_req.get_response(self.ext_api)
- def _make_security_group(self, fmt, name, description, external_id=None,
- **kwargs):
- res = self._create_security_group(fmt, name, description,
- external_id, **kwargs)
+ def _make_security_group(self, fmt, name, description, **kwargs):
+ res = self._create_security_group(fmt, name, description, **kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
@contextlib.contextmanager
def security_group(self, name='webservers', description='webservers',
- external_id=None, fmt=None, no_delete=False):
+ fmt=None, no_delete=False):
if not fmt:
fmt = self.fmt
- security_group = self._make_security_group(fmt, name, description,
- external_id)
+ security_group = self._make_security_group(fmt, name, description)
try:
yield security_group
finally:
direction='ingress', protocol='tcp',
port_range_min='22', port_range_max='22',
source_ip_prefix=None, source_group_id=None,
- external_id=None, fmt=None, no_delete=False,
- ethertype='IPv4'):
+ fmt=None, no_delete=False, ethertype='IPv4'):
if not fmt:
fmt = self.fmt
rule = self._build_security_group_rule(security_group_id,
port_range_max,
source_ip_prefix,
source_group_id,
- external_id,
ethertype=ethertype)
security_group_rule = self._make_security_group_rule(self.fmt, rule)
try:
for k, v, in keys:
self.assertEqual(security_group['security_group'][k], v)
- def test_create_security_group_external_id(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- name = 'webservers'
- description = 'my webservers'
- external_id = 10
- keys = [('name', name,), ('description', description),
- ('external_id', external_id)]
- with self.security_group(name, description, external_id) as sg:
- for k, v, in keys:
- self.assertEqual(sg['security_group'][k], v)
-
def test_default_security_group(self):
with self.network():
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(len(groups['security_groups']), 1)
- def test_create_security_group_proxy_mode_not_admin(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- res = self._create_security_group(self.fmt, 'webservers',
- 'webservers', '1',
- tenant_id='bad_tenant',
- set_context=True)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 403)
-
- def test_create_security_group_no_external_id_proxy_mode(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- res = self._create_security_group(self.fmt, 'webservers',
- 'webservers')
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 400)
-
- def test_create_security_group_no_external_id_not_proxy_mode(self):
- res = self._create_security_group(self.fmt, 'webservers',
- 'webservers', '1')
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
-
def test_create_default_security_group_fail(self):
name = 'default'
description = 'my webservers'
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
- def test_create_security_group_duplicate_external_id(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- name = 'webservers'
- description = 'my webservers'
- external_id = 1
- with self.security_group(name, description, external_id):
- res = self._create_security_group(self.fmt, name, description,
- external_id)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
-
def test_list_security_groups(self):
with contextlib.nested(self.security_group(name='sg1',
description='sg'),
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 404)
- def test_create_security_group_rule_exteral_id_proxy_mode(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- with self.security_group(external_id=1) as sg:
- rule = {'security_group_rule':
- {'security_group_id': sg['security_group']['id'],
- 'direction': 'ingress',
- 'protocol': 'tcp',
- 'port_range_min': '22',
- 'port_range_max': '22',
- 'external_id': '1',
- 'tenant_id': 'test_tenant',
- 'source_group_id': sg['security_group']['id']}}
-
- res = self._create_security_group_rule(self.fmt, rule)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 201)
-
- def test_create_security_group_rule_exteral_id_not_proxy_mode(self):
- with self.security_group() as sg:
- rule = {'security_group_rule':
- {'security_group_id': sg['security_group']['id'],
- 'direction': 'ingress',
- 'protocol': 'tcp',
- 'port_range_min': '22',
- 'port_range_max': '22',
- 'external_id': 1,
- 'tenant_id': 'test_tenant',
- 'source_group_id': sg['security_group']['id']}}
-
- res = self._create_security_group_rule(self.fmt, rule)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 409)
-
- def test_create_security_group_rule_not_admin(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- with self.security_group(external_id='1') as sg:
- rule = {'security_group_rule':
- {'security_group_id': sg['security_group']['id'],
- 'direction': 'ingress',
- 'protocol': 'tcp',
- 'port_range_min': '22',
- 'port_range_max': '22',
- 'tenant_id': 'bad_tenant',
- 'external_id': 1,
- 'source_group_id': sg['security_group']['id']}}
-
- res = self._create_security_group_rule(self.fmt, rule,
- tenant_id='bad_tenant',
- set_context=True)
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 403)
-
def test_create_security_group_rule_bad_tenant_source_group_id(self):
with self.security_group() as sg:
res = self._create_security_group(self.fmt, 'webservers',
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
- def test_validate_port_external_id_quantum_id(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- with self.network() as n:
- with self.subnet(n):
- sg1 = (self.deserialize(self.fmt,
- self._create_security_group(self.fmt,
- 'foo', 'bar', '1')))
- sg2 = (self.deserialize(self.fmt,
- self._create_security_group(self.fmt,
- 'foo', 'bar', '2')))
- res = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[sg1['security_group']['id']])
-
- port = self.deserialize(self.fmt, res)
- # This request updates the port sending the quantum security
- # group id in and a nova security group id.
- data = {'port': {'fixed_ips': port['port']['fixed_ips'],
- 'name': port['port']['name'],
- ext_sg.SECURITYGROUPS:
- [sg1['security_group']['external_id'],
- sg2['security_group']['id']]}}
- req = self.new_update_request('ports', data,
- port['port']['id'])
- res = self.deserialize(self.fmt, req.get_response(self.api))
- self.assertEquals(len(res['port'][ext_sg.SECURITYGROUPS]), 2)
- for sg_id in res['port'][ext_sg.SECURITYGROUPS]:
- # only security group id's should be
- # returned and not external_ids
- self.assertEquals(len(sg_id), 36)
- self._delete('ports', port['port']['id'])
-
- def test_validate_port_external_id_string_or_int(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- with self.network() as n:
- with self.subnet(n):
- string_id = '1'
- int_id = 2
- self.deserialize(
- self.fmt, self._create_security_group(self.fmt,
- 'foo', 'bar',
- string_id))
- self.deserialize(
- self.fmt, self._create_security_group(self.fmt,
- 'foo', 'bar',
- int_id))
- res = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[string_id, int_id])
-
- port = self.deserialize(self.fmt, res)
- self._delete('ports', port['port']['id'])
-
- def test_create_port_with_non_uuid_or_int(self):
+ def test_create_port_with_non_uuid(self):
with self.network() as n:
with self.subnet(n):
res = self._create_port(self.fmt, n['network']['id'],
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
- def test_validate_port_external_id_fail(self):
- cfg.CONF.set_override('proxy_mode', True, 'SECURITYGROUP')
- with self.network() as n:
- with self.subnet(n):
- bad_id = 1
- res = self._create_port(
- self.fmt, n['network']['id'],
- security_groups=[bad_id])
-
- self.deserialize(self.fmt, res)
- self.assertEqual(res.status_int, 404)
-
class TestSecurityGroupsXML(TestSecurityGroups):
fmt = 'xml'