self.dont_fragment = cfg.CONF.AGENT.dont_fragment
if self.enable_tunneling:
self.setup_tunnel_br(tun_br)
- # Collect additional bridges to monitor
- self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)
# Security group agent support
self.sg_agent = OFASecurityGroupAgent(self.context,
instructions=instructions)
self.ryu_send_msg(msg)
- def setup_ancillary_bridges(self, integ_br, tun_br):
- """Setup ancillary bridges - for example br-ex."""
- ovs_bridges = set(ovs_lib.get_bridges(self.root_helper))
- # Remove all known bridges
- ovs_bridges.remove(integ_br)
- if self.enable_tunneling:
- ovs_bridges.remove(tun_br)
- br_names = [self.phys_brs[physical_network].br_name for
- physical_network in self.phys_brs]
- ovs_bridges.difference_update(br_names)
- # Filter list of bridges to those that have external
- # bridge-id's configured
- br_names = [
- bridge for bridge in ovs_bridges
- if bridge != ovs_lib.get_bridge_external_bridge_id(
- self.root_helper, bridge)
- ]
- ovs_bridges.difference_update(br_names)
- ancillary_bridges = []
- for bridge in ovs_bridges:
- br = OVSBridge(bridge, self.root_helper, self.ryuapp)
- ancillary_bridges.append(br)
- LOG.info(_('ancillary bridge list: %s.'), ancillary_bridges)
- return ancillary_bridges
-
def _tun_br_sort_incoming_traffic_depend_in_port(self, br):
match = br.ofparser.OFPMatch(
in_port=int(self.patch_int_ofport))
changed_ports.add(port)
return changed_ports
- def update_ancillary_ports(self, registered_ports):
- # TODO(yamamoto): stop using ovsdb
- # - do the same as scan_ports
- # - or, find a way to update status of ancillary ports differently
- # eg. let interface drivers mark ports up
- ports = set()
- for bridge in self.ancillary_brs:
- ports |= bridge.get_vif_port_set()
-
- if ports == registered_ports:
- return
- added = ports - registered_ports
- removed = registered_ports - ports
- return {'current': ports,
- 'added': added,
- 'removed': removed}
-
def treat_vif_port(self, vif_port, port_id, network_id, network_type,
physical_network, segmentation_id, admin_state_up):
if vif_port:
self.port_dead(port)
return resync
- def treat_ancillary_devices_added(self, devices):
- resync = False
- for device in devices:
- LOG.info(_("Ancillary Port %s added"), device)
- try:
- self.plugin_rpc.get_device_details(self.context, device,
- self.agent_id)
- except Exception as e:
- LOG.debug(_("Unable to get port details for "
- "%(device)s: %(e)s"),
- {'device': device, 'e': e})
- resync = True
- continue
-
- # update plugin about port status
- self.plugin_rpc.update_device_up(self.context,
- device,
- self.agent_id,
- cfg.CONF.host)
- return resync
-
def treat_devices_removed(self, devices):
resync = False
self.sg_agent.remove_devices_filter(devices)
self.port_unbound(device)
return resync
- def treat_ancillary_devices_removed(self, devices):
- resync = False
- for device in devices:
- LOG.info(_("Attachment %s removed"), device)
- try:
- details = self.plugin_rpc.update_device_down(self.context,
- device,
- self.agent_id,
- cfg.CONF.host)
- except Exception as e:
- LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
- {'device': device, 'e': e})
- resync = True
- continue
- if details['exists']:
- LOG.info(_("Port %s updated."), device)
- # Nothing to do regarding local networking
- else:
- LOG.debug(_("Device %s not defined on plugin"), device)
- return resync
-
def process_network_ports(self, port_info):
resync_add = False
resync_removed = False
# If one of the above opertaions fails => resync with plugin
return (resync_add | resync_removed)
- def process_ancillary_network_ports(self, port_info):
- resync_add = False
- resync_removed = False
- if 'added' in port_info:
- start = time.time()
- resync_add = self.treat_ancillary_devices_added(port_info['added'])
- LOG.debug(_("process_ancillary_network_ports - iteration: "
- "%(iter_num)d - treat_ancillary_devices_added "
- "completed in %(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- if 'removed' in port_info:
- start = time.time()
- resync_removed = self.treat_ancillary_devices_removed(
- port_info['removed'])
- LOG.debug(_("process_ancillary_network_ports - iteration: "
- "%(iter_num)d - treat_ancillary_devices_removed "
- "completed in %(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
-
- # If one of the above opertaions fails => resync with plugin
- return (resync_add | resync_removed)
-
def tunnel_sync(self):
resync = False
try:
sync = True
ports = set()
updated_ports_copy = set()
- ancillary_ports = set()
tunnel_sync = True
while True:
start = time.time()
- port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0},
- 'ancillary': {'added': 0, 'removed': 0}}
+ port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0}}
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%d started"),
self.iter_num)
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
- ancillary_ports.clear()
sync = False
polling_manager.force_polling()
# Notify the plugin of tunnel IP
len(port_info.get('updated', [])))
port_stats['regular']['removed'] = (
len(port_info.get('removed', [])))
- # Treat ancillary devices if they exist
- if self.ancillary_brs:
- port_info = self.update_ancillary_ports(
- ancillary_ports)
- LOG.debug(_("Agent ovsdb_monitor_loop - "
- "iteration:%(iter_num)d - "
- "ancillary port info retrieved. "
- "Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
-
- if port_info:
- rc = self.process_ancillary_network_ports(
- port_info)
- LOG.debug(_("Agent ovsdb_monitor_loop - "
- "iteration:"
- "%(iter_num)d - ancillary ports "
- "processed. Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- ancillary_ports = port_info['current']
- port_stats['ancillary']['added'] = (
- len(port_info.get('added', [])))
- port_stats['ancillary']['removed'] = (
- len(port_info.get('removed', [])))
- sync = sync | rc
polling_manager.polling_completed()
except Exception:
mock.patch.object(self.mod_agent.OFANeutronAgent,
'setup_integration_br',
return_value=mock.Mock()),
- mock.patch.object(self.mod_agent.OFANeutronAgent,
- 'setup_ancillary_bridges',
- return_value=[]),
mock.patch.object(self.mod_agent.OVSBridge,
'get_local_port_mac',
return_value='00:00:00:00:00:01'),
ovs_br_class, tun_output_ctrl):
self.agent.setup_tunnel_br(cfg.CONF.OVS.tunnel_bridge)
tun_output_ctrl.assert_called_once_with(self.agent.tun_br)
-
-
-class AncillaryBridgesTest(ofa_test_base.OFAAgentTestBase):
-
- def setUp(self):
- super(AncillaryBridgesTest, self).setUp()
- notifier_p = mock.patch(NOTIFIER)
- notifier_cls = notifier_p.start()
- self.notifier = mock.Mock()
- notifier_cls.return_value = self.notifier
- # Avoid rpc initialization for unit tests
- cfg.CONF.set_override('rpc_backend',
- 'neutron.openstack.common.rpc.impl_fake')
- cfg.CONF.set_override('report_interval', 0, 'AGENT')
- self.kwargs = self.mod_agent.create_agent_config_map(cfg.CONF)
-
- def _test_ancillary_bridges(self, bridges, ancillary):
- device_ids = ancillary[:]
-
- def pullup_side_effect(self, *args):
- result = device_ids.pop(0)
- return result
-
- with contextlib.nested(
- mock.patch.object(self.mod_agent.OFANeutronAgent,
- 'setup_integration_br',
- return_value=mock.Mock()),
- mock.patch('neutron.agent.linux.utils.get_interface_mac',
- return_value='00:00:00:00:00:01'),
- mock.patch.object(self.mod_agent.OVSBridge,
- 'get_local_port_mac',
- return_value='00:00:00:00:00:01'),
- mock.patch('neutron.agent.linux.ovs_lib.get_bridges',
- return_value=bridges),
- mock.patch(
- 'neutron.agent.linux.ovs_lib.get_bridge_external_bridge_id',
- side_effect=pullup_side_effect)):
- self.agent = self.mod_agent.OFANeutronAgent(
- self.ryuapp, **self.kwargs)
- self.assertEqual(len(ancillary), len(self.agent.ancillary_brs))
- if ancillary:
- bridges = [br.br_name for br in self.agent.ancillary_brs]
- for br in ancillary:
- self.assertIn(br, bridges)
-
- def test_ancillary_bridges_single(self):
- bridges = ['br-int', 'br-ex']
- self._test_ancillary_bridges(bridges, ['br-ex'])
-
- def test_ancillary_bridges_none(self):
- bridges = ['br-int']
- self._test_ancillary_bridges(bridges, [])
-
- def test_ancillary_bridges_multiple(self):
- bridges = ['br-int', 'br-ex1', 'br-ex2']
- self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])