self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
- [topics.NETWORK, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.L2POPULATION, topics.UPDATE, cfg.CONF.host]]
self.connection = agent_rpc.create_consumers(self.endpoints,
if vif_id in vlan_mapping.vif_ports:
return network_id
- def network_delete(self, context, **kwargs):
- network_id = kwargs.get('network_id')
- LOG.debug(_("network_delete received network %s"), network_id)
- # The network may not be defined on this agent
- lvm = self.local_vlan_map.get(network_id)
- if lvm:
- self.reclaim_local_vlan(network_id)
- else:
- LOG.debug(_("Network %s not used on agent."), network_id)
-
def port_update(self, context, **kwargs):
port = kwargs.get('port')
# Put the port identifier in the updated_ports set.
self.agent.int_br_device_count
)
- def test_network_delete(self):
- with contextlib.nested(
- mock.patch.object(self.agent, "reclaim_local_vlan"),
- mock.patch.object(self.agent.tun_br, "cleanup_tunnel_port")
- ) as (recl_fn, clean_tun_fn):
- self.agent.network_delete("unused_context",
- network_id="123")
- self.assertFalse(recl_fn.called)
- self.agent.local_vlan_map["123"] = "LVM object"
- self.agent.network_delete("unused_context",
- network_id="123")
- self.assertFalse(clean_tun_fn.called)
- recl_fn.assert_called_with("123")
-
def test_port_update(self):
port = {"id": "b1981919-f516-11e3-a8f4-08606e7f74e7",
"network_id": "124",