requested_groups = set(port_sg)
port_sg_missing = requested_groups - valid_groups
if port_sg_missing:
- raise ext_sg.SecurityGroupNotFound(id=str(port_sg_missing[0]))
+ raise ext_sg.SecurityGroupNotFound(id=', '.join(port_sg_missing))
return requested_groups
import mock
import oslo_db.exception as exc
+import testtools
import webob.exc
from neutron.api.v2 import attributes as attr
from neutron.db import db_base_plugin_v2
from neutron.db import securitygroups_db
from neutron.extensions import securitygroup as ext_sg
+from neutron import manager
from neutron.tests import base
from neutron.tests.unit import test_db_plugin
for k, v, in keys:
self.assertEqual(sg_rule[0][k], v)
+ def test_get_security_group_on_port_from_wrong_tenant(self):
+ plugin = manager.NeutronManager.get_plugin()
+ if not hasattr(plugin, '_get_security_groups_on_port'):
+ self.skipTest("plugin doesn't use the mixin with this method")
+ neutron_context = context.get_admin_context()
+ res = self._create_security_group(self.fmt, 'webservers', 'webservers',
+ tenant_id='bad_tenant')
+ sg1 = self.deserialize(self.fmt, res)
+ with testtools.ExpectedException(ext_sg.SecurityGroupNotFound):
+ plugin._get_security_groups_on_port(
+ neutron_context,
+ {'port': {'security_groups': [sg1['security_group']['id']],
+ 'tenant_id': 'tenant'}}
+ )
+
def test_delete_security_group(self):
name = 'webservers'
description = 'my webservers'