if ports:
return ports[0]
+ def _get_router_ids(self, context):
+ """Function to retrieve router IDs for a context without joins"""
+ query = self._model_query(context, l3_db.Router.id)
+ return [row[0] for row in query]
+
def _check_fips_availability_on_host_ext_net(
self, context, host_id, fip_ext_net_id):
"""Query all floating_ips and filter on host and external net."""
fip_count_on_host = 0
with context.session.begin(subtransactions=True):
- routers = self._get_sync_routers(context, router_ids=None)
- router_ids = [router['id'] for router in routers]
+ router_ids = self._get_router_ids(context)
floating_ips = self._get_sync_floating_ips(context, router_ids)
# Check for the active floatingip in the host
for fip in floating_ips:
from neutron.common import constants as l3_const
from neutron.common import exceptions
from neutron import context
+from neutron.db import common_db_mixin
from neutron.db import l3_dvr_db
from neutron import manager
from neutron.openstack.common import uuidutils
result = self._create_router(router)
self.assertEqual(expected, result.extra_attributes['distributed'])
+ def test_router_id_query(self):
+ # need to create an object that has the common db method required
+ class DVRwithCommon(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
+ common_db_mixin.CommonDbMixin):
+ pass
+ self.mixin = DVRwithCommon()
+ routers = [self._create_router({'name': '%s' % x,
+ 'admin_state_up': True})
+ for x in range(10)]
+ expected = [router['id'] for router in routers]
+ router_ids = self.mixin._get_router_ids(self.ctx)
+ self.assertEqual(sorted(expected), sorted(router_ids))
+
def test_create_router_db_default(self):
self._test__create_router_db(expected=False)