'payload': payload})
return enabled_agents
+ def _is_reserved_dhcp_port(self, port):
+ return port.get('device_id') == constants.DEVICE_ID_RESERVED_DHCP_PORT
+
def _notify_agents(self, context, method, payload, network_id):
"""Notify all the agents that are hosting the network."""
# fanout is required as we do not know who is "listening"
context, [network_id])
# schedule the network first, if needed
- schedule_required = method == 'port_create_end'
+ schedule_required = (
+ method == 'port_create_end' and
+ not self._is_reserved_dhcp_port(payload['port']))
if schedule_required:
agents = self._schedule_network(admin_ctx, network, agents)
agent.admin_state_up = True
agent.heartbeat_timestamp = timeutils.utcnow()
g.return_value = [agent]
+ dummy_payload = {'port': {}}
self.notifier._notify_agents(mock.Mock(), method,
- mock.ANY, 'foo_network_id')
+ dummy_payload, 'foo_network_id')
self.assertEqual(expected_scheduling, f.call_count)
self.assertEqual(expected_casts, self.mock_cast.call_count)
for expected in expected_calls[DHCP_HOSTC]:
self.assertIn(expected, self.dhcp_notifier_cast.call_args_list)
+ def _is_schedule_network_called(self, device_id):
+ plugin = manager.NeutronManager.get_plugin()
+ notifier = plugin.agent_notifiers[constants.AGENT_TYPE_DHCP]
+ with contextlib.nested(
+ self.subnet(),
+ mock.patch.object(plugin,
+ 'get_dhcp_agents_hosting_networks',
+ return_value=[]),
+ mock.patch.object(notifier,
+ '_schedule_network',
+ return_value=[])
+ ) as (subnet, _, mock_sched):
+ with self.port(subnet=subnet, device_id=device_id):
+ return mock_sched.called
+
+ def test_reserved_dhcp_port_creation(self):
+ device_id = constants.DEVICE_ID_RESERVED_DHCP_PORT
+ self.assertFalse(self._is_schedule_network_called(device_id))
+
+ def test_unreserved_dhcp_port_creation(self):
+ device_id = 'not_reserved'
+ self.assertTrue(self._is_schedule_network_called(device_id))
+
class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
test_agent_ext_plugin.AgentDBTestMixIn,