self._check_ovs_version()
if self.enable_tunneling:
self.setup_tunnel_br(tun_br)
+ # Collect additional bridges to monitor
+ self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)
self.agent_state = {
'binary': 'neutron-openvswitch-agent',
'host': cfg.CONF.host,
int_br.add_flow(priority=1, actions="normal")
return int_br
+ def setup_ancillary_bridges(self, integ_br, tun_br):
+ '''Setup ancillary bridges - for example br-ex.'''
+ ovs_bridges = set(ovs_lib.get_bridges(self.root_helper))
+ # Remove all known bridges
+ ovs_bridges.remove(integ_br)
+ if self.enable_tunneling:
+ ovs_bridges.remove(tun_br)
+ br_names = [self.phys_brs[physical_network].br_name for
+ physical_network in self.phys_brs]
+ ovs_bridges.difference_update(br_names)
+ # Filter list of bridges to those that have external
+ # bridge-id's configured
+ br_names = []
+ for bridge in ovs_bridges:
+ id = ovs_lib.get_bridge_external_bridge_id(self.root_helper,
+ bridge)
+ if id != bridge:
+ br_names.append(bridge)
+ ovs_bridges.difference_update(br_names)
+ ancillary_bridges = []
+ for bridge in ovs_bridges:
+ br = ovs_lib.OVSBridge(bridge, self.root_helper)
+ LOG.info(_('Adding %s to list of bridges.'), bridge)
+ ancillary_bridges.append(br)
+ return ancillary_bridges
+
def setup_tunnel_br(self, tun_br):
'''Setup the tunnel bridge.
'added': added,
'removed': removed}
+ def update_ancillary_ports(self, registered_ports):
+ ports = set()
+ for bridge in self.ancillary_brs:
+ ports |= bridge.get_vif_port_set()
+
+ if ports == registered_ports:
+ return
+ added = ports - registered_ports
+ removed = registered_ports - ports
+ return {'current': ports,
+ 'added': added,
+ 'removed': removed}
+
def treat_vif_port(self, vif_port, port_id, network_id, network_type,
physical_network, segmentation_id, admin_state_up):
if vif_port:
self.port_dead(port)
return resync
+ def treat_ancillary_devices_added(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info(_("Ancillary Port %s added"), device)
+ try:
+ self.plugin_rpc.get_device_details(self.context, device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug(_("Unable to get port details for "
+ "%(device)s: %(e)s"),
+ {'device': device, 'e': e})
+ resync = True
+ continue
+ return resync
+
def treat_devices_removed(self, devices):
resync = False
self.sg_agent.remove_devices_filter(devices)
self.port_unbound(device)
return resync
+ def treat_ancillary_devices_removed(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info(_("Attachment %s removed"), device)
+ try:
+ details = self.plugin_rpc.update_device_down(self.context,
+ device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
+ {'device': device, 'e': e})
+ resync = True
+ continue
+ if details['exists']:
+ LOG.info(_("Port %s updated."), device)
+ # Nothing to do regarding local networking
+ else:
+ LOG.debug(_("Device %s not defined on plugin"), device)
+ return resync
+
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
# If one of the above opertaions fails => resync with plugin
return (resync_a | resync_b)
+ def process_ancillary_network_ports(self, port_info):
+ resync_a = False
+ resync_b = False
+ if 'added' in port_info:
+ resync_a = self.treat_ancillary_devices_added(port_info['added'])
+ if 'removed' in port_info:
+ resync_b = self.treat_ancillary_devices_removed(
+ port_info['removed'])
+ # If one of the above opertaions fails => resync with plugin
+ return (resync_a | resync_b)
+
def tunnel_sync(self):
resync = False
try:
def rpc_loop(self):
sync = True
ports = set()
+ ancillary_ports = set()
tunnel_sync = True
while True:
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
+ ancillary_ports.clear()
sync = False
# Notify the plugin of tunnel IP
sync = self.process_network_ports(port_info)
ports = port_info['current']
+ # Treat ancillary devices if they exist
+ if self.ancillary_brs:
+ port_info = self.update_ancillary_ports(ancillary_ports)
+ if port_info:
+ rc = self.process_ancillary_network_ports(port_info)
+ ancillary_ports = port_info['current']
+ sync = sync | rc
+
except Exception:
LOG.exception(_("Error in agent event loop"))
sync = True
mock.patch('neutron.plugins.openvswitch.agent.ovs_neutron_agent.'
'OVSNeutronAgent.setup_integration_br',
return_value=mock.Mock()),
+ mock.patch('neutron.plugins.openvswitch.agent.ovs_neutron_agent.'
+ 'OVSNeutronAgent.setup_ancillary_bridges',
+ return_value=[]),
mock.patch('neutron.agent.linux.utils.get_interface_mac',
return_value='00:00:00:00:00:01')):
self.agent = ovs_neutron_agent.OVSNeutronAgent(**kwargs)
self._check_ovs_vxlan_version('1.10', '1.9',
constants.MINIMUM_OVS_VXLAN_VERSION,
expecting_ok=False)
+
+
+class AncillaryBridgesTest(base.BaseTestCase):
+
+ def setUp(self):
+ super(AncillaryBridgesTest, self).setUp()
+ self.addCleanup(cfg.CONF.reset)
+ self.addCleanup(mock.patch.stopall)
+ notifier_p = mock.patch(NOTIFIER)
+ notifier_cls = notifier_p.start()
+ self.notifier = mock.Mock()
+ notifier_cls.return_value = self.notifier
+ # Avoid rpc initialization for unit tests
+ cfg.CONF.set_override('rpc_backend',
+ 'neutron.openstack.common.rpc.impl_fake')
+ cfg.CONF.set_override('report_interval', 0, 'AGENT')
+ self.kwargs = ovs_neutron_agent.create_agent_config_map(cfg.CONF)
+
+ def _test_ancillary_bridges(self, bridges, ancillary):
+ device_ids = ancillary[:]
+
+ def pullup_side_effect(self, *args):
+ result = device_ids.pop(0)
+ return result
+
+ with contextlib.nested(
+ mock.patch('neutron.plugins.openvswitch.agent.ovs_neutron_agent.'
+ 'OVSNeutronAgent.setup_integration_br',
+ return_value=mock.Mock()),
+ mock.patch('neutron.agent.linux.utils.get_interface_mac',
+ return_value='00:00:00:00:00:01'),
+ mock.patch('neutron.agent.linux.ovs_lib.get_bridges',
+ return_value=bridges),
+ mock.patch(
+ 'neutron.agent.linux.ovs_lib.get_bridge_external_bridge_id',
+ side_effect=pullup_side_effect)):
+ self.agent = ovs_neutron_agent.OVSNeutronAgent(**self.kwargs)
+ self.assertEqual(len(ancillary), len(self.agent.ancillary_brs))
+ if ancillary:
+ bridges = [br.br_name for br in self.agent.ancillary_brs]
+ for br in ancillary:
+ self.assertIn(br, bridges)
+
+ def test_ancillary_bridges_single(self):
+ bridges = ['br-int', 'br-ex']
+ self._test_ancillary_bridges(bridges, ['br-ex'])
+
+ def test_ancillary_bridges_none(self):
+ bridges = ['br-int']
+ self._test_ancillary_bridges(bridges, [])
+
+ def test_ancillary_bridges_multiple(self):
+ bridges = ['br-int', 'br-ex1', 'br-ex2']
+ self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])