# License for the specific language governing permissions and limitations
# under the License.
-import collections
-
from sqlalchemy import or_
from sqlalchemy.orm import exc
def get_sg_ids_grouped_by_port(port_ids):
- sg_ids_grouped_by_port = collections.defaultdict(list)
+ sg_ids_grouped_by_port = {}
session = db_api.get_session()
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
query = query.filter(or_(*or_criteria))
for port, sg_id in query:
+ if port not in sg_ids_grouped_by_port:
+ sg_ids_grouped_by_port[port] = []
if sg_id:
sg_ids_grouped_by_port[port].append(sg_id)
return sg_ids_grouped_by_port
self.fmt, net_id, security_groups=[sg['security_group']['id']])
return port['port']
+ def _make_port_without_sec_group(self, net_id):
+ port = self._make_port(
+ self.fmt, net_id, security_groups=[])
+ return port['port']
+
def test_security_group_get_ports_from_devices(self):
with self.network() as n:
with self.subnet(n):
- port1 = self._make_port_with_new_sec_group(n['network']['id'])
- port2 = self._make_port_with_new_sec_group(n['network']['id'])
+ orig_ports = [
+ self._make_port_with_new_sec_group(n['network']['id']),
+ self._make_port_with_new_sec_group(n['network']['id']),
+ self._make_port_without_sec_group(n['network']['id'])
+ ]
plugin = manager.NeutronManager.get_plugin()
# should match full ID and starting chars
ports = plugin.get_ports_from_devices(
- [port1['id'], port2['id'][0:8]])
- self.assertEqual(2, len(ports))
+ [orig_ports[0]['id'], orig_ports[1]['id'][0:8],
+ orig_ports[2]['id']])
+ self.assertEqual(len(orig_ports), len(ports))
for port_dict in ports:
- p = port1 if port1['id'] == port_dict['id'] else port2
+ p = next(p for p in orig_ports
+ if p['id'] == port_dict['id'])
self.assertEqual(p['id'], port_dict['id'])
self.assertEqual(p['security_groups'],
port_dict[ext_sg.SECURITYGROUPS])