from sqlalchemy.orm import scoped_session
from quantum.api.v2 import attributes as attr
-from quantum.common import utils
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import securitygroup as ext_sg
security_group_id = self._get_port_security_group_bindings(
context, filters, fields)
- port[ext_sg.SECURITYGROUP] = []
+ port[ext_sg.SECURITYGROUPS] = []
for security_group_id in security_group_id:
- port[ext_sg.SECURITYGROUP].append(
+ port[ext_sg.SECURITYGROUPS].append(
security_group_id['security_group_id'])
return port
def _validate_security_groups_on_port(self, context, port):
p = port['port']
- if not attr.is_attr_set(p.get(ext_sg.SECURITYGROUP)):
+ if not attr.is_attr_set(p.get(ext_sg.SECURITYGROUPS)):
return
if p.get('device_owner') and p['device_owner'].startswith('network:'):
raise ext_sg.SecurityGroupInvalidDeviceOwner()
valid_groups = self.get_security_groups(context, fields={'id': None})
valid_groups_set = set([x['id'] for x in valid_groups])
- req_sg_set = set(p[ext_sg.SECURITYGROUP])
+ req_sg_set = set(p[ext_sg.SECURITYGROUPS])
invalid_sg_set = req_sg_set - valid_groups_set
if invalid_sg_set:
msg = ' '.join(str(x) for x in invalid_sg_set)
tenant_id = self._get_tenant_id_for_create(context,
port['port'])
default_sg = self._ensure_default_security_group(context, tenant_id)
- if attr.is_attr_set(port['port'].get(ext_sg.SECURITYGROUP)):
- sgids = port['port'].get(ext_sg.SECURITYGROUP)
+ if attr.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)):
+ sgids = port['port'].get(ext_sg.SECURITYGROUPS)
else:
sgids = [default_sg]
- port['port'][ext_sg.SECURITYGROUP] = sgids
+ port['port'][ext_sg.SECURITYGROUPS] = sgids
}
-SECURITYGROUP = 'security_groups'
+SECURITYGROUPS = 'security_groups'
EXTENDED_ATTRIBUTES_2_0 = {
- 'ports': {SECURITYGROUP: {'allow_post': True,
- 'allow_put': True,
- 'is_visible': True,
- 'default': attr.ATTR_NOT_SPECIFIED}}}
+ 'ports': {SECURITYGROUPS: {'allow_post': True,
+ 'allow_put': True,
+ 'is_visible': True,
+ 'default': attr.ATTR_NOT_SPECIFIED}}}
security_group_quota_opts = [
cfg.IntOpt('quota_security_group',
default=10,
with session.begin(subtransactions=True):
self._ensure_default_security_group_on_port(context, port)
self._validate_security_groups_on_port(context, port)
- sgids = port['port'].get(ext_sg.SECURITYGROUP)
+ sgids = port['port'].get(ext_sg.SECURITYGROUPS)
port = super(LinuxBridgePluginV2,
self).create_port(context, port)
self._process_port_create_security_group(
self.notifier.security_groups_provider_updated(context)
else:
self.notifier.security_groups_member_updated(
- context, port.get(ext_sg.SECURITYGROUP))
+ context, port.get(ext_sg.SECURITYGROUPS))
return self._extend_port_dict_binding(context, port)
def update_port(self, context, id, port):
port_updated = False
with session.begin(subtransactions=True):
# delete the port binding and read it with the new rules
- if ext_sg.SECURITYGROUP in port['port']:
+ if ext_sg.SECURITYGROUPS in port['port']:
self._delete_port_security_group_bindings(context, id)
self._process_port_create_security_group(
context,
id,
- port['port'][ext_sg.SECURITYGROUP])
+ port['port'][ext_sg.SECURITYGROUPS])
port_updated = True
port = super(LinuxBridgePluginV2, self).update_port(
if (original_port['fixed_ips'] != port['fixed_ips'] or
not utils.compare_elements(
- original_port.get(ext_sg.SECURITYGROUP),
- port.get(ext_sg.SECURITYGROUP))):
+ original_port.get(ext_sg.SECURITYGROUPS),
+ port.get(ext_sg.SECURITYGROUPS))):
self.notifier.security_groups_member_updated(
- context, port.get(ext_sg.SECURITYGROUP))
+ context, port.get(ext_sg.SECURITYGROUPS))
if port_updated:
self._notify_port_updated(context, port)
self._delete_port_security_group_bindings(context, id)
super(LinuxBridgePluginV2, self).delete_port(context, id)
self.notifier.security_groups_member_updated(
- context, port.get(ext_sg.SECURITYGROUP))
+ context, port.get(ext_sg.SECURITYGROUPS))
def _notify_port_updated(self, context, port):
binding = db.get_network_binding(context.session,
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name'],
- ext_sg.SECURITYGROUP:
+ ext_sg.SECURITYGROUPS:
[security_group_id]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
- self.assertEquals(res['port'][ext_sg.SECURITYGROUP][0],
+ self.assertEquals(res['port'][ext_sg.SECURITYGROUPS][0],
security_group_id)
self._delete('ports', port['port']['id'])
self.notifier.assert_has_calls(
fixed_ips = port['port']['fixed_ips']
data = {'port': {'fixed_ips': fixed_ips,
'name': port['port']['name'],
- ext_sg.SECURITYGROUP:
+ ext_sg.SECURITYGROUPS:
[security_group_id]}}
req = self.new_update_request('ports', data,
port_dict = lb_db.get_port_from_device(device_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
- port_dict[ext_sg.SECURITYGROUP])
+ port_dict[ext_sg.SECURITYGROUPS])
self.assertEqual([], port_dict['security_group_rules'])
self.assertEqual([fixed_ips[0]['ip_address']],
port_dict['fixed_ips'])
def create_port(self, context, port):
tenant_id = self._get_tenant_id_for_create(context, port['port'])
default_sg = self._ensure_default_security_group(context, tenant_id)
- if not port['port'].get(ext_sg.SECURITYGROUP):
- port['port'][ext_sg.SECURITYGROUP] = [default_sg]
+ if not port['port'].get(ext_sg.SECURITYGROUPS):
+ port['port'][ext_sg.SECURITYGROUPS] = [default_sg]
self._validate_security_groups_on_port(context, port)
session = context.session
with session.begin(subtransactions=True):
- sgids = port['port'].get(ext_sg.SECURITYGROUP)
+ sgids = port['port'].get(ext_sg.SECURITYGROUPS)
port = super(SecurityGroupTestPlugin, self).create_port(context,
port)
self._process_port_create_security_group(context, port['id'],
def update_port(self, context, id, port):
session = context.session
with session.begin(subtransactions=True):
- if ext_sg.SECURITYGROUP in port['port']:
+ if ext_sg.SECURITYGROUPS in port['port']:
self._validate_security_groups_on_port(context, port)
# delete the port binding and read it with the new rules
self._delete_port_security_group_bindings(context, id)
- self._process_port_create_security_group(context, id,
- port['port'].get(
- ext_sg.SECURITYGROUP))
+ self._process_port_create_security_group(
+ context, id, port['port'].get(ext_sg.SECURITYGROUPS))
port = super(SecurityGroupTestPlugin, self).update_port(
context, id, port)
self._extend_port_dict_security_group(context, port)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
'name': port['port']['name'],
- ext_sg.SECURITYGROUP:
+ ext_sg.SECURITYGROUPS:
[sg['security_group']['id']]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
- self.assertEqual(res['port'][ext_sg.SECURITYGROUP][0],
+ self.assertEqual(res['port'][ext_sg.SECURITYGROUPS][0],
sg['security_group']['id'])
# Test update port without security group
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
- self.assertEqual(res['port'][ext_sg.SECURITYGROUP][0],
+ self.assertEqual(res['port'][ext_sg.SECURITYGROUPS][0],
sg['security_group']['id'])
self._delete('ports', port['port']['id'])
sg2['security_group']['id']])
port = self.deserialize('json', res)
self.assertEqual(len(
- port['port'][ext_sg.SECURITYGROUP]), 2)
+ port['port'][ext_sg.SECURITYGROUPS]), 2)
self._delete('ports', port['port']['id'])
def test_update_port_remove_security_group(self):
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
- self.assertEqual(res['port'].get(ext_sg.SECURITYGROUP),
+ self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS),
[])
self._delete('ports', port['port']['id'])
security_groups=(
[sg['security_group']['id']]))
port = self.deserialize('json', res)
- self.assertEqual(port['port'][ext_sg.SECURITYGROUP][0],
+ self.assertEqual(port['port'][ext_sg.SECURITYGROUPS][0],
sg['security_group']['id'])
# try to delete security group that's in use
res = self._delete('security-groups',