# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.PORT, topics.DELETE],
- [topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
if vif_id in vlan_mapping.vif_ports:
return network_id
- def network_delete(self, context, **kwargs):
- LOG.debug("network_delete received")
- network_id = kwargs.get('network_id')
- LOG.debug("Delete %s", network_id)
- # The network may not be defined on this agent
- lvm = self.local_vlan_map.get(network_id)
- if lvm:
- self.reclaim_local_vlan(network_id)
- else:
- LOG.debug("Network %s not used on agent.", network_id)
-
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
report_st.assert_called_with(self.agent.context,
self.agent.agent_state, True)
- def test_network_delete(self):
- with mock.patch.object(self.agent, "reclaim_local_vlan") as recl_fn,\
- mock.patch.object(self.agent.tun_br,
- "cleanup_tunnel_port") as clean_tun_fn:
- self.agent.network_delete("unused_context",
- network_id="123")
- self.assertFalse(recl_fn.called)
- self.agent.local_vlan_map["123"] = "LVM object"
- self.agent.network_delete("unused_context",
- network_id="123")
- self.assertFalse(clean_tun_fn.called)
- recl_fn.assert_called_with("123")
-
def test_port_update(self):
port = {"id": "123",
"network_id": "124",