from neutron.common import utils
from neutron.db import agents_db
from neutron.db import model_base
+from neutron.extensions import agent as ext_agent
from neutron.extensions import dhcpagentscheduler
from neutron.openstack.common import log as logging
return {'networks': []}
def list_active_networks_on_active_dhcp_agent(self, context, host):
- agent = self._get_agent_by_type_and_host(
- context, constants.AGENT_TYPE_DHCP, host)
+ try:
+ agent = self._get_agent_by_type_and_host(
+ context, constants.AGENT_TYPE_DHCP, host)
+ except ext_agent.AgentNotFoundByTypeHost:
+ LOG.debug("DHCP Agent not found on host %s", host)
+ return []
+
if not agent.admin_state_up:
return []
query = context.session.query(NetworkDhcpAgentBinding.network_id)
self.assertEqual(1, num_before_remove)
self.assertEqual(0, num_after_remove)
+ def test_list_active_networks_on_not_registered_yet_dhcp_agent(self):
+ plugin = manager.NeutronManager.get_plugin()
+ nets = plugin.list_active_networks_on_active_dhcp_agent(
+ self.adminContext, host=DHCP_HOSTA)
+ self.assertEqual([], nets)
+
def test_reserved_port_after_network_remove_from_dhcp_agent(self):
dhcp_hosta = {
'binary': 'neutron-dhcp-agent',