from neutron.agent.common import utils
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import ip_lib
+from neutron.agent.linux import polling as linux_polling
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
port_info['removed'] = registered_ports - cur_ports
return port_info
+ def process_ports_events(self, events, registered_ports, ancillary_ports,
+ updated_ports=None):
+ port_info = {}
+ port_info['added'] = set()
+ port_info['removed'] = set()
+ port_info['current'] = registered_ports
+
+ ancillary_port_info = {}
+ ancillary_port_info['added'] = set()
+ ancillary_port_info['removed'] = set()
+ ancillary_port_info['current'] = (
+ ancillary_ports if ancillary_ports else set())
+
+ # if a port was added and then removed or viceversa since the agent
+ # can't know the order of the operations, check the status of the port
+ # to determine if the port was added or deleted
+ device_removed_or_added = [
+ dev for dev in events['added'] if dev in events['removed']]
+ for device in device_removed_or_added:
+ if ovs_lib.BaseOVS().port_exists(device['name']):
+ events['removed'].remove(device)
+ else:
+ events['added'].remove(device)
+
+ #TODO(rossella_s): scanning the ancillary bridge won't be needed
+ # anymore when https://review.openstack.org/#/c/203381 since the bridge
+ # id stored in external_ids will be used to identify the bridge the
+ # port belongs to
+ cur_ancillary_ports = set()
+ for bridge in self.ancillary_brs:
+ cur_ancillary_ports |= bridge.get_vif_port_set()
+ cur_ancillary_ports |= ancillary_port_info['current']
+
+ def _process_device(device, devices, ancillary_devices):
+ # check 'iface-id' is set otherwise is not a port
+ # the agent should care about
+ if 'attached-mac' in device.get('external_ids', []):
+ iface_id = self.int_br.portid_from_external_ids(
+ device['external_ids'])
+ if iface_id:
+ if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
+ #TODO(rossella_s) it's extreme to trigger a full resync
+ # if a port is not ready, resync only the device that
+ # is not ready
+ raise Exception(
+ _("Port %s is not ready, resync needed") % device[
+ 'name'])
+ # check if device belong to ancillary bridge
+ if iface_id in cur_ancillary_ports:
+ ancillary_devices.add(iface_id)
+ else:
+ devices.add(iface_id)
+
+ for device in events['added']:
+ _process_device(device, port_info['added'],
+ ancillary_port_info['added'])
+ for device in events['removed']:
+ _process_device(device, port_info['removed'],
+ ancillary_port_info['removed'])
+
+ if updated_ports is None:
+ updated_ports = set()
+ updated_ports.update(self.check_changed_vlans())
+
+ # Disregard devices that were never noticed by the agent
+ port_info['removed'] &= port_info['current']
+ port_info['current'] |= port_info['added']
+ port_info['current'] -= port_info['removed']
+
+ ancillary_port_info['removed'] &= ancillary_port_info['current']
+ ancillary_port_info['current'] |= ancillary_port_info['added']
+ ancillary_port_info['current'] -= ancillary_port_info['removed']
+
+ if updated_ports:
+ # Some updated ports might have been removed in the
+ # meanwhile, and therefore should not be processed.
+ # In this case the updated port won't be found among
+ # current ports.
+ updated_ports &= port_info['current']
+ port_info['updated'] = updated_ports
+ return port_info, ancillary_port_info
+
def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
bridge.cleanup_flows()
+ def process_port_info(self, start, polling_manager, sync, ovs_restarted,
+ ports, ancillary_ports, updated_ports_copy,
+ consecutive_resyncs):
+ # There are polling managers that don't have get_events, e.g.
+ # AlwaysPoll used by windows implementations
+ # REVISIT (rossella_s) This needs to be reworked to hide implementation
+ # details regarding polling in BasePollingManager subclasses
+ if sync or not (hasattr(polling_manager, 'get_events')):
+ if sync:
+ LOG.info(_LI("Agent out of sync with plugin!"))
+ consecutive_resyncs = consecutive_resyncs + 1
+ if (consecutive_resyncs >=
+ constants.MAX_DEVICE_RETRIES):
+ LOG.warn(_LW(
+ "Clearing cache of registered ports,"
+ " retries to resync were > %s"),
+ constants.MAX_DEVICE_RETRIES)
+ ports.clear()
+ ancillary_ports.clear()
+ consecutive_resyncs = 0
+ else:
+ consecutive_resyncs = 0
+
+ # NOTE(rossella_s) don't empty the queue of events
+ # calling polling_manager.get_events() since
+ # the agent might miss some event (for example a port
+ # deletion)
+ reg_ports = (set() if ovs_restarted else ports)
+ port_info = self.scan_ports(reg_ports, sync,
+ updated_ports_copy)
+ # Treat ancillary devices if they exist
+ if self.ancillary_brs:
+ ancillary_port_info = self.scan_ancillary_ports(
+ ancillary_ports, sync)
+ LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
+ " - ancillary port info retrieved. "
+ "Elapsed:%(elapsed).3f",
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ else:
+ ancillary_port_info = {}
+
+ else:
+ consecutive_resyncs = 0
+ events = polling_manager.get_events()
+ ancillary_ports = (
+ ancillary_ports if self.ancillary_brs else None)
+ port_info, ancillary_port_info = (
+ self.process_ports_events(events, ports,
+ ancillary_ports, updated_ports_copy))
+ return port_info, ancillary_port_info, consecutive_resyncs
+
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
minimize_polling=False)
- sync = True
+ sync = False
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
start = time.time()
LOG.debug("Agent rpc_loop - iteration:%d started",
self.iter_num)
- if sync:
- LOG.info(_LI("Agent out of sync with plugin!"))
- polling_manager.force_polling()
- consecutive_resyncs = consecutive_resyncs + 1
- if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
- LOG.warn(_LW("Clearing cache of registered ports, retrials"
- " to resync were > %s"),
- constants.MAX_DEVICE_RETRIES)
- ports.clear()
- ancillary_ports.clear()
- sync = False
- consecutive_resyncs = 0
- else:
- consecutive_resyncs = 0
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows()
+ # restart the polling manager so that it will signal as added
+ # all the current ports
+ # REVISIT (rossella_s) Define a method "reset" in
+ # BasePollingManager that will be implemented by AlwaysPoll as
+ # no action and by InterfacePollingMinimizer as start/stop
+ if isinstance(
+ polling_manager, linux_polling.InterfacePollingMinimizer):
+ polling_manager.stop()
+ polling_manager.start()
elif ovs_status == constants.OVS_DEAD:
# Agent doesn't apply any operations when ovs is dead, to
# prevent unexpected failure or crash. Sleep and continue
LOG.exception(_LE("Error while synchronizing tunnels"))
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
- if self._agent_has_updates(polling_manager) or ovs_restarted:
+ if self._agent_has_updates(polling_manager) or sync:
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
- reg_ports = (set() if ovs_restarted else ports)
- port_info = self.scan_ports(reg_ports, sync,
- updated_ports_copy)
+ port_info, ancillary_port_info, consecutive_resyncs = (
+ self.process_port_info(
+ start, polling_manager, sync, ovs_restarted,
+ ports, ancillary_ports, updated_ports_copy,
+ consecutive_resyncs)
+ )
+
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
- # Treat ancillary devices if they exist
- if self.ancillary_brs:
- ancillary_port_info = self.scan_ancillary_ports(
- ancillary_ports, sync)
- LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
- "ancillary port info retrieved. "
- "Elapsed:%(elapsed).3f",
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- sync = False
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
if tunnel_types:
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
agent.sg_agent = mock.Mock()
+ agent.ancillary_brs = None
return agent
- def start_agent(self, agent, unplug_ports=None):
+ def _mock_get_events(self, agent, polling_manager, ports):
+ get_events = polling_manager.get_events
+ p_ids = [p['id'] for p in ports]
+
+ def filter_events():
+ events = get_events()
+ filtered_ports = []
+ for dev in events['added']:
+ iface_id = agent.int_br.portid_from_external_ids(
+ dev.get('external_ids', []))
+ if iface_id in p_ids:
+ # if the event is not about a port that was created by
+ # this test, we filter the event out. Since these tests are
+ # not run in isolation processing all the events might make
+ # some test fail ( e.g. the agent might keep resycing
+ # because it keeps finding not ready ports that are created
+ # by other tests)
+ filtered_ports.append(dev)
+ return {'added': filtered_ports, 'removed': events['removed']}
+ polling_manager.get_events = mock.Mock(side_effect=filter_events)
+
+ def start_agent(self, agent, ports=None, unplug_ports=None):
if unplug_ports is None:
unplug_ports = []
+ if ports is None:
+ ports = []
self.setup_agent_rpc_mocks(agent, unplug_ports)
polling_manager = polling.InterfacePollingMinimizer()
+ self._mock_get_events(agent, polling_manager, ports)
self.addCleanup(polling_manager.stop)
polling_manager.start()
agent_utils.wait_until_true(
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
+ return polling_manager
def _create_test_port_dict(self):
return {'id': uuidutils.generate_uuid(),
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
trigger_resync=False):
+ self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels)
- self.start_agent(self.agent)
+ self.polling_manager = self.start_agent(self.agent, ports=self.ports)
self.network = self._create_test_network_dict()
- self.ports = port_dicts
if trigger_resync:
self._prepare_resync_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent)
port_dicts=self.create_test_ports())
self.wait_until_ports_state(self.ports, up=True)
self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED
- # OVS restarted, the agent should reprocess all the ports
self.agent.plugin_rpc.update_device_list.reset_mock()
self.wait_until_ports_state(self.ports, up=True)
updated_ports)
self.assertEqual(expected, actual)
+ def test_process_ports_events_returns_current_for_unchanged_ports(self):
+ with mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+ events = {'added': [], 'removed': []}
+ registered_ports = {1, 3}
+ ancillary_ports = {2, 5}
+ expected_ports = {'current': registered_ports, 'added': set(),
+ 'removed': set()}
+ expected_ancillary = {'current': ancillary_ports, 'added': set(),
+ 'removed': set()}
+ actual = self.agent.process_ports_events(events, registered_ports,
+ ancillary_ports)
+ self.assertEqual((expected_ports, expected_ancillary), actual)
+
+ def test_process_port_events_returns_port_changes(self):
+ events = {'added': [{'name': 'port3', 'ofport': 3,
+ 'external_ids': {'attached-mac': 'test-mac'}},
+ {'name': 'qg-port2', 'ofport': 5,
+ 'external_ids': {'attached-mac': 'test-mac'}}],
+ 'removed': [{'name': 'port2', 'ofport': 2,
+ 'external_ids': {'attached-mac': 'test-mac'}},
+ {'name': 'qg-port1', 'ofport': 4,
+ 'external_ids': {'attached-mac': 'test-mac'}}]}
+ registered_ports = {1, 2}
+ ancillary_ports = {4}
+ expected_ports = dict(
+ current={1, 3}, added={3}, removed={2})
+ expected_ancillary_ports = dict(
+ current={5}, added={5}, removed={4})
+ ancillary_bridge = mock.Mock()
+ ancillary_bridge.get_vif_port_set.return_value = {4, 5}
+ self.agent.ancillary_brs = [ancillary_bridge]
+ with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+ side_effect=[3, 5, 2, 4]), \
+ mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+ actual = self.agent.process_ports_events(
+ events, registered_ports, ancillary_ports)
+ self.assertEqual(
+ (expected_ports, expected_ancillary_ports), actual)
+
+ def _test_process_port_events_with_updated_ports(self, updated_ports):
+ events = {'added': [{'name': 'port3', 'ofport': 3,
+ 'external_ids': {'attached-mac': 'test-mac'}},
+ {'name': 'qg-port2', 'ofport': 6,
+ 'external_ids': {'attached-mac': 'test-mac'}}],
+ 'removed': [{'name': 'port2', 'ofport': 2,
+ 'external_ids': {'attached-mac': 'test-mac'}},
+ {'name': 'qg-port1', 'ofport': 5,
+ 'external_ids': {'attached-mac': 'test-mac'}}]}
+ registered_ports = {1, 2, 4}
+ ancillary_ports = {5, 8}
+ expected_ports = dict(current={1, 3, 4}, added={3},
+ removed={2}, updated={4})
+ expected_ancillary = dict(current={6, 8}, added={6},
+ removed={5})
+ ancillary_bridge = mock.Mock()
+ ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
+ self.agent.ancillary_brs = [ancillary_bridge]
+ with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+ side_effect=[3, 6, 2, 5]), \
+ mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+
+ actual = self.agent.process_ports_events(
+ events, registered_ports, ancillary_ports, updated_ports)
+ self.assertEqual((expected_ports, expected_ancillary), actual)
+
+ def test_process_port_events_finds_known_updated_ports(self):
+ self._test_process_port_events_with_updated_ports({4})
+
+ def test_process_port_events_ignores_unknown_updated_ports(self):
+ # the port '5' was not seen on current ports. Hence it has either
+ # never been wired or already removed and should be ignored
+ self._test_process_port_events_with_updated_ports({4, 5})
+
+ def test_process_port_events_ignores_updated_port_if_removed(self):
+ events = {'added': [{'name': 'port3', 'ofport': 3,
+ 'external_ids': {'attached-mac': 'test-mac'}}],
+ 'removed': [{'name': 'port2', 'ofport': 2,
+ 'external_ids': {'attached-mac': 'test-mac'}}]}
+ registered_ports = {1, 2}
+ updated_ports = {1, 2}
+ expected_ports = dict(current={1, 3}, added={3},
+ removed={2}, updated={1})
+ expected_ancillary = dict(current=set(), added=set(), removed=set())
+ with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+ side_effect=[3, 2]), \
+ mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+
+ actual = self.agent.process_ports_events(
+ events, registered_ports, None, updated_ports)
+ self.assertEqual((expected_ports, expected_ancillary), actual)
+
+ def test_process_port_events_no_vif_changes_return_updated_port_only(self):
+ events = {'added': [], 'removed': []}
+ registered_ports = {1, 2, 3}
+ updated_ports = {2}
+ expected_ports = dict(current=registered_ports, updated={2},
+ added=set(), removed=set())
+ expected_ancillary = dict(current=set(), added=set(), removed=set())
+ with mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+ actual = self.agent.process_ports_events(
+ events, registered_ports, None, updated_ports)
+ self.assertEqual((expected_ports, expected_ancillary), actual)
+
+ def test_process_port_events_ignores_removed_port_if_never_added(self):
+ events = {'added': [],
+ 'removed': [{'name': 'port2', 'ofport': 2,
+ 'external_ids': {'attached-mac': 'test-mac'}}]}
+ registered_ports = {1}
+ expected_ports = dict(current=registered_ports, added=set(),
+ removed=set())
+ expected_ancillary = dict(current=set(), added=set(), removed=set())
+ with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+ side_effect=[2]), \
+ mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+ actual = self.agent.process_ports_events(events, registered_ports,
+ None)
+ self.assertEqual((expected_ports, expected_ancillary), actual)
+
def test_update_ports_returns_changed_vlan(self):
br = self.br_int_cls('br-int')
mac = "ca:fe:de:ad:be:ef"
'added': set([]),
'removed': set(['tap0'])}
+ reply_ancillary = {'current': set([]),
+ 'added': set([]),
+ 'removed': set([])}
+
with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
+ mock.patch.object(async_process.AsyncProcess, "start"),\
+ mock.patch.object(async_process.AsyncProcess, "stop"),\
mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
- 'scan_ports') as scan_ports,\
+ 'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
- scan_ports.side_effect = [reply2, reply3]
+ process_p_events.side_effect = [(reply2, reply_ancillary),
+ (reply3, reply_ancillary)]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
check_ovs_status.side_effect = args
except Exception:
pass
- scan_ports.assert_has_calls([
- mock.call(set(), True, set()),
- mock.call(set(), False, set())
+ process_p_events.assert_has_calls([
+ mock.call({'removed': [], 'added': []}, set(), None, set()),
+ mock.call({'removed': [], 'added': []}, set(['tap0']), None,
+ set())
])
+
process_network_ports.assert_has_calls([
mock.call(reply2, False),
mock.call(reply3, True)
self._verify_mock_calls()
def test_daemon_loop(self):
- reply2 = {'current': set(['tap0']),
- 'added': set(['tap2']),
- 'removed': set([])}
+ reply_ge_1 = {'added': set(['tap0']),
+ 'removed': set([])}
- reply3 = {'current': set(['tap2']),
- 'added': set([]),
+ reply_ge_2 = {'added': set([]),
'removed': set(['tap0'])}
+ reply_pe_1 = {'current': set(['tap0']),
+ 'added': set(['tap0']),
+ 'removed': set([])}
+
+ reply_pe_2 = {'current': set([]),
+ 'added': set([]),
+ 'removed': set(['tap0'])}
+
+ reply_ancillary = {'current': set([]),
+ 'added': set([]),
+ 'removed': set([])}
+
self.mock_int_bridge_expected += [
mock.call.check_canary_table(),
mock.call.check_canary_table()
with mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
- 'scan_ports') as scan_ports,\
+ 'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
- scan_ports.side_effect = [reply2, reply3]
update_stale.return_value = []
+ process_p_events.side_effect = [
+ (reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)]
+ interface_polling = mock.Mock()
+ interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
# We start method and expect it will raise after 2nd loop
# If something goes wrong, assert_has_calls below will catch it
try:
- n_agent.daemon_loop()
+ n_agent.rpc_loop(interface_polling)
except Exception:
pass
# messages
log_exception.assert_called_once_with(
"Error while processing VIF ports")
- scan_ports.assert_has_calls([
- mock.call(set(), True, set()),
- mock.call(set(['tap0']), False, set())
+ process_p_events.assert_has_calls([
+ mock.call(reply_ge_1, set(), None, set()),
+ mock.call(reply_ge_2, set(['tap0']), None, set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
- 'added': set(['tap2'])}, False),
- mock.call({'current': set(['tap2']),
- 'removed': set(['tap0']),
- 'added': set([])}, False)
+ 'added': set(['tap0'])}, False),
])
cleanup.assert_called_once_with()