ofports = _ofport_set_to_str(self.tun_br_ofports[tunnel_type].values())
if ofports and not self.l2_pop:
# Update flooding flows to include the new tunnel
- for network_id, vlan_mapping in self.local_vlan_map.iteritems():
+ for vlan_mapping in list(self.local_vlan_map.values()):
if vlan_mapping.network_type == tunnel_type:
br.mod_flow(table=constants.FLOOD_TO_TUN,
dl_vlan=vlan_mapping.vlan,
self.agent.treat_devices_added_or_updated.assert_called_with(
['port1'], ovs_restarted=False)
+ def test__setup_tunnel_port_while_new_mapping_is_added(self):
+ """
+ Test that _setup_tunnel_port doesn't fail if new vlan mapping is
+ added in a different coroutine while iterating over existing mappings.
+ See bug 1449944 for more info.
+ """
+
+ def add_new_vlan_mapping(*args, **kwargs):
+ self.agent.local_vlan_map['bar'] = (
+ ovs_neutron_agent.LocalVLANMapping(1, 2, 3, 4))
+ bridge = mock.Mock()
+ tunnel_type = 'vxlan'
+ self.agent.tun_br_ofports = {tunnel_type: dict()}
+ self.agent.l2_pop = False
+ self.agent.local_vlan_map = {
+ 'foo': ovs_neutron_agent.LocalVLANMapping(4, tunnel_type, 2, 1)}
+ bridge.mod_flow.side_effect = add_new_vlan_mapping
+ with mock.patch('neutron.plugins.openvswitch.agent.ovs_neutron_agent.'
+ '_ofport_set_to_str', return_value=True):
+ self.agent._setup_tunnel_port(bridge, 1, 2,
+ tunnel_type=tunnel_type)
+ self.assertIn('bar', self.agent.local_vlan_map)
+
class AncillaryBridgesTest(base.BaseTestCase):