from neutron.agent.common import utils
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import ip_lib
-from neutron.agent.linux import polling as linux_polling
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
port_info['removed'] = registered_ports - cur_ports
return port_info
- def process_ports_events(self, events, registered_ports, ancillary_ports,
- updated_ports=None):
- port_info = {}
- port_info['added'] = set()
- port_info['removed'] = set()
- port_info['current'] = registered_ports
-
- ancillary_port_info = {}
- ancillary_port_info['added'] = set()
- ancillary_port_info['removed'] = set()
- ancillary_port_info['current'] = (
- ancillary_ports if ancillary_ports else set())
-
- # if a port was added and then removed or viceversa since the agent
- # can't know the order of the operations, check the status of the port
- # to determine if the port was added or deleted
- device_removed_or_added = [
- dev for dev in events['added'] if dev in events['removed']]
- for device in device_removed_or_added:
- if ovs_lib.BaseOVS().port_exists(device['name']):
- events['removed'].remove(device)
- else:
- events['added'].remove(device)
-
- #TODO(rossella_s): scanning the ancillary bridge won't be needed
- # anymore when https://review.openstack.org/#/c/203381 since the bridge
- # id stored in external_ids will be used to identify the bridge the
- # port belongs to
- cur_ancillary_ports = set()
- for bridge in self.ancillary_brs:
- cur_ancillary_ports |= bridge.get_vif_port_set()
- cur_ancillary_ports |= ancillary_port_info['current']
-
- def _process_device(device, devices, ancillary_devices):
- # check 'iface-id' is set otherwise is not a port
- # the agent should care about
- if 'attached-mac' in device.get('external_ids', []):
- iface_id = self.int_br.portid_from_external_ids(
- device['external_ids'])
- if iface_id:
- if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
- #TODO(rossella_s) it's extreme to trigger a full resync
- # if a port is not ready, resync only the device that
- # is not ready
- raise Exception(
- _("Port %s is not ready, resync needed") % device[
- 'name'])
- # check if device belong to ancillary bridge
- if iface_id in cur_ancillary_ports:
- ancillary_devices.add(iface_id)
- else:
- devices.add(iface_id)
-
- for device in events['added']:
- _process_device(device, port_info['added'],
- ancillary_port_info['added'])
- for device in events['removed']:
- _process_device(device, port_info['removed'],
- ancillary_port_info['removed'])
-
- if updated_ports is None:
- updated_ports = set()
- updated_ports.update(self.check_changed_vlans())
-
- # Disregard devices that were never noticed by the agent
- port_info['removed'] &= port_info['current']
- port_info['current'] |= port_info['added']
- port_info['current'] -= port_info['removed']
-
- ancillary_port_info['removed'] &= ancillary_port_info['current']
- ancillary_port_info['current'] |= ancillary_port_info['added']
- ancillary_port_info['current'] -= ancillary_port_info['removed']
-
- if updated_ports:
- # Some updated ports might have been removed in the
- # meanwhile, and therefore should not be processed.
- # In this case the updated port won't be found among
- # current ports.
- updated_ports &= port_info['current']
- port_info['updated'] = updated_ports
- return port_info, ancillary_port_info
-
def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
bridge.cleanup_flows()
- def process_port_info(self, start, polling_manager, sync, ovs_restarted,
- ports, ancillary_ports, updated_ports_copy,
- consecutive_resyncs):
- # There are polling managers that don't have get_events, e.g.
- # AlwaysPoll used by windows implementations
- # REVISIT (rossella_s) This needs to be reworked to hide implementation
- # details regarding polling in BasePollingManager subclasses
- if sync or not (hasattr(polling_manager, 'get_events')):
- if sync:
- LOG.info(_LI("Agent out of sync with plugin!"))
- consecutive_resyncs = consecutive_resyncs + 1
- if (consecutive_resyncs >=
- constants.MAX_DEVICE_RETRIES):
- LOG.warn(_LW(
- "Clearing cache of registered ports,"
- " retries to resync were > %s"),
- constants.MAX_DEVICE_RETRIES)
- ports.clear()
- ancillary_ports.clear()
- consecutive_resyncs = 0
- else:
- consecutive_resyncs = 0
-
- # NOTE(rossella_s) don't empty the queue of events
- # calling polling_manager.get_events() since
- # the agent might miss some event (for example a port
- # deletion)
- reg_ports = (set() if ovs_restarted else ports)
- port_info = self.scan_ports(reg_ports, sync,
- updated_ports_copy)
- # Treat ancillary devices if they exist
- if self.ancillary_brs:
- ancillary_port_info = self.scan_ancillary_ports(
- ancillary_ports, sync)
- LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
- " - ancillary port info retrieved. "
- "Elapsed:%(elapsed).3f",
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- else:
- ancillary_port_info = {}
-
- else:
- consecutive_resyncs = 0
- events = polling_manager.get_events()
- ancillary_ports = (
- ancillary_ports if self.ancillary_brs else None)
- port_info, ancillary_port_info = (
- self.process_ports_events(events, ports,
- ancillary_ports, updated_ports_copy))
- return port_info, ancillary_port_info, consecutive_resyncs
-
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
minimize_polling=False)
- sync = False
+ sync = True
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
start = time.time()
LOG.debug("Agent rpc_loop - iteration:%d started",
self.iter_num)
+ if sync:
+ LOG.info(_LI("Agent out of sync with plugin!"))
+ polling_manager.force_polling()
+ consecutive_resyncs = consecutive_resyncs + 1
+ if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
+ LOG.warn(_LW("Clearing cache of registered ports, retrials"
+ " to resync were > %s"),
+ constants.MAX_DEVICE_RETRIES)
+ ports.clear()
+ ancillary_ports.clear()
+ sync = False
+ consecutive_resyncs = 0
+ else:
+ consecutive_resyncs = 0
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows()
- # restart the polling manager so that it will signal as added
- # all the current ports
- # REVISIT (rossella_s) Define a method "reset" in
- # BasePollingManager that will be implemented by AlwaysPoll as
- # no action and by InterfacePollingMinimizer as start/stop
- if isinstance(
- polling_manager, linux_polling.InterfacePollingMinimizer):
- polling_manager.stop()
- polling_manager.start()
elif ovs_status == constants.OVS_DEAD:
# Agent doesn't apply any operations when ovs is dead, to
# prevent unexpected failure or crash. Sleep and continue
LOG.exception(_LE("Error while synchronizing tunnels"))
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
- if self._agent_has_updates(polling_manager) or sync:
+ if self._agent_has_updates(polling_manager) or ovs_restarted:
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
- port_info, ancillary_port_info, consecutive_resyncs = (
- self.process_port_info(
- start, polling_manager, sync, ovs_restarted,
- ports, ancillary_ports, updated_ports_copy,
- consecutive_resyncs)
- )
-
+ reg_ports = (set() if ovs_restarted else ports)
+ port_info = self.scan_ports(reg_ports, sync,
+ updated_ports_copy)
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
+ # Treat ancillary devices if they exist
+ if self.ancillary_brs:
+ ancillary_port_info = self.scan_ancillary_ports(
+ ancillary_ports, sync)
+ LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
+ "ancillary port info retrieved. "
+ "Elapsed:%(elapsed).3f",
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ sync = False
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
if tunnel_types:
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
agent.sg_agent = mock.Mock()
- agent.ancillary_brs = None
return agent
- def _mock_get_events(self, agent, polling_manager, ports):
- get_events = polling_manager.get_events
- p_ids = [p['id'] for p in ports]
-
- def filter_events():
- events = get_events()
- filtered_ports = []
- for dev in events['added']:
- iface_id = agent.int_br.portid_from_external_ids(
- dev.get('external_ids', []))
- if iface_id in p_ids:
- # if the event is not about a port that was created by
- # this test, we filter the event out. Since these tests are
- # not run in isolation processing all the events might make
- # some test fail ( e.g. the agent might keep resycing
- # because it keeps finding not ready ports that are created
- # by other tests)
- filtered_ports.append(dev)
- return {'added': filtered_ports, 'removed': events['removed']}
- polling_manager.get_events = mock.Mock(side_effect=filter_events)
-
- def start_agent(self, agent, ports=None, unplug_ports=None):
+ def start_agent(self, agent, unplug_ports=None):
if unplug_ports is None:
unplug_ports = []
- if ports is None:
- ports = []
self.setup_agent_rpc_mocks(agent, unplug_ports)
polling_manager = polling.InterfacePollingMinimizer()
- self._mock_get_events(agent, polling_manager, ports)
self.addCleanup(polling_manager.stop)
polling_manager.start()
agent_utils.wait_until_true(
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
- return polling_manager
def _create_test_port_dict(self):
return {'id': uuidutils.generate_uuid(),
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
trigger_resync=False):
- self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels)
- self.polling_manager = self.start_agent(self.agent, ports=self.ports)
+ self.start_agent(self.agent)
self.network = self._create_test_network_dict()
+ self.ports = port_dicts
if trigger_resync:
self._prepare_resync_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent)
port_dicts=self.create_test_ports())
self.wait_until_ports_state(self.ports, up=True)
self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED
+ # OVS restarted, the agent should reprocess all the ports
self.agent.plugin_rpc.update_device_list.reset_mock()
self.wait_until_ports_state(self.ports, up=True)
updated_ports)
self.assertEqual(expected, actual)
- def test_process_ports_events_returns_current_for_unchanged_ports(self):
- with mock.patch.object(self.agent, 'check_changed_vlans',
- return_value=set()):
- events = {'added': [], 'removed': []}
- registered_ports = {1, 3}
- ancillary_ports = {2, 5}
- expected_ports = {'current': registered_ports, 'added': set(),
- 'removed': set()}
- expected_ancillary = {'current': ancillary_ports, 'added': set(),
- 'removed': set()}
- actual = self.agent.process_ports_events(events, registered_ports,
- ancillary_ports)
- self.assertEqual((expected_ports, expected_ancillary), actual)
-
- def test_process_port_events_returns_port_changes(self):
- events = {'added': [{'name': 'port3', 'ofport': 3,
- 'external_ids': {'attached-mac': 'test-mac'}},
- {'name': 'qg-port2', 'ofport': 5,
- 'external_ids': {'attached-mac': 'test-mac'}}],
- 'removed': [{'name': 'port2', 'ofport': 2,
- 'external_ids': {'attached-mac': 'test-mac'}},
- {'name': 'qg-port1', 'ofport': 4,
- 'external_ids': {'attached-mac': 'test-mac'}}]}
- registered_ports = {1, 2}
- ancillary_ports = {4}
- expected_ports = dict(
- current={1, 3}, added={3}, removed={2})
- expected_ancillary_ports = dict(
- current={5}, added={5}, removed={4})
- ancillary_bridge = mock.Mock()
- ancillary_bridge.get_vif_port_set.return_value = {4, 5}
- self.agent.ancillary_brs = [ancillary_bridge]
- with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
- side_effect=[3, 5, 2, 4]), \
- mock.patch.object(self.agent, 'check_changed_vlans',
- return_value=set()):
- actual = self.agent.process_ports_events(
- events, registered_ports, ancillary_ports)
- self.assertEqual(
- (expected_ports, expected_ancillary_ports), actual)
-
- def _test_process_port_events_with_updated_ports(self, updated_ports):
- events = {'added': [{'name': 'port3', 'ofport': 3,
- 'external_ids': {'attached-mac': 'test-mac'}},
- {'name': 'qg-port2', 'ofport': 6,
- 'external_ids': {'attached-mac': 'test-mac'}}],
- 'removed': [{'name': 'port2', 'ofport': 2,
- 'external_ids': {'attached-mac': 'test-mac'}},
- {'name': 'qg-port1', 'ofport': 5,
- 'external_ids': {'attached-mac': 'test-mac'}}]}
- registered_ports = {1, 2, 4}
- ancillary_ports = {5, 8}
- expected_ports = dict(current={1, 3, 4}, added={3},
- removed={2}, updated={4})
- expected_ancillary = dict(current={6, 8}, added={6},
- removed={5})
- ancillary_bridge = mock.Mock()
- ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
- self.agent.ancillary_brs = [ancillary_bridge]
- with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
- side_effect=[3, 6, 2, 5]), \
- mock.patch.object(self.agent, 'check_changed_vlans',
- return_value=set()):
-
- actual = self.agent.process_ports_events(
- events, registered_ports, ancillary_ports, updated_ports)
- self.assertEqual((expected_ports, expected_ancillary), actual)
-
- def test_process_port_events_finds_known_updated_ports(self):
- self._test_process_port_events_with_updated_ports({4})
-
- def test_process_port_events_ignores_unknown_updated_ports(self):
- # the port '5' was not seen on current ports. Hence it has either
- # never been wired or already removed and should be ignored
- self._test_process_port_events_with_updated_ports({4, 5})
-
- def test_process_port_events_ignores_updated_port_if_removed(self):
- events = {'added': [{'name': 'port3', 'ofport': 3,
- 'external_ids': {'attached-mac': 'test-mac'}}],
- 'removed': [{'name': 'port2', 'ofport': 2,
- 'external_ids': {'attached-mac': 'test-mac'}}]}
- registered_ports = {1, 2}
- updated_ports = {1, 2}
- expected_ports = dict(current={1, 3}, added={3},
- removed={2}, updated={1})
- expected_ancillary = dict(current=set(), added=set(), removed=set())
- with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
- side_effect=[3, 2]), \
- mock.patch.object(self.agent, 'check_changed_vlans',
- return_value=set()):
-
- actual = self.agent.process_ports_events(
- events, registered_ports, None, updated_ports)
- self.assertEqual((expected_ports, expected_ancillary), actual)
-
- def test_process_port_events_no_vif_changes_return_updated_port_only(self):
- events = {'added': [], 'removed': []}
- registered_ports = {1, 2, 3}
- updated_ports = {2}
- expected_ports = dict(current=registered_ports, updated={2},
- added=set(), removed=set())
- expected_ancillary = dict(current=set(), added=set(), removed=set())
- with mock.patch.object(self.agent, 'check_changed_vlans',
- return_value=set()):
- actual = self.agent.process_ports_events(
- events, registered_ports, None, updated_ports)
- self.assertEqual((expected_ports, expected_ancillary), actual)
-
- def test_process_port_events_ignores_removed_port_if_never_added(self):
- events = {'added': [],
- 'removed': [{'name': 'port2', 'ofport': 2,
- 'external_ids': {'attached-mac': 'test-mac'}}]}
- registered_ports = {1}
- expected_ports = dict(current=registered_ports, added=set(),
- removed=set())
- expected_ancillary = dict(current=set(), added=set(), removed=set())
- with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
- side_effect=[2]), \
- mock.patch.object(self.agent, 'check_changed_vlans',
- return_value=set()):
- actual = self.agent.process_ports_events(events, registered_ports,
- None)
- self.assertEqual((expected_ports, expected_ancillary), actual)
-
def test_update_ports_returns_changed_vlan(self):
br = self.br_int_cls('br-int')
mac = "ca:fe:de:ad:be:ef"
'added': set([]),
'removed': set(['tap0'])}
- reply_ancillary = {'current': set([]),
- 'added': set([]),
- 'removed': set([])}
-
with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
- mock.patch.object(async_process.AsyncProcess, "start"),\
- mock.patch.object(async_process.AsyncProcess, "stop"),\
mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
- 'process_ports_events') as process_p_events,\
+ 'scan_ports') as scan_ports,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
- process_p_events.side_effect = [(reply2, reply_ancillary),
- (reply3, reply_ancillary)]
+ scan_ports.side_effect = [reply2, reply3]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
check_ovs_status.side_effect = args
except Exception:
pass
- process_p_events.assert_has_calls([
- mock.call({'removed': [], 'added': []}, set(), None, set()),
- mock.call({'removed': [], 'added': []}, set(['tap0']), None,
- set())
+ scan_ports.assert_has_calls([
+ mock.call(set(), True, set()),
+ mock.call(set(), False, set())
])
-
process_network_ports.assert_has_calls([
mock.call(reply2, False),
mock.call(reply3, True)
self._verify_mock_calls()
def test_daemon_loop(self):
- reply_ge_1 = {'added': set(['tap0']),
- 'removed': set([])}
+ reply2 = {'current': set(['tap0']),
+ 'added': set(['tap2']),
+ 'removed': set([])}
- reply_ge_2 = {'added': set([]),
+ reply3 = {'current': set(['tap2']),
+ 'added': set([]),
'removed': set(['tap0'])}
- reply_pe_1 = {'current': set(['tap0']),
- 'added': set(['tap0']),
- 'removed': set([])}
-
- reply_pe_2 = {'current': set([]),
- 'added': set([]),
- 'removed': set(['tap0'])}
-
- reply_ancillary = {'current': set([]),
- 'added': set([]),
- 'removed': set([])}
-
self.mock_int_bridge_expected += [
mock.call.check_canary_table(),
mock.call.check_canary_table()
with mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
- 'process_ports_events') as process_p_events,\
+ 'scan_ports') as scan_ports,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
+ scan_ports.side_effect = [reply2, reply3]
update_stale.return_value = []
- process_p_events.side_effect = [
- (reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)]
- interface_polling = mock.Mock()
- interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
# We start method and expect it will raise after 2nd loop
# If something goes wrong, assert_has_calls below will catch it
try:
- n_agent.rpc_loop(interface_polling)
+ n_agent.daemon_loop()
except Exception:
pass
# messages
log_exception.assert_called_once_with(
"Error while processing VIF ports")
- process_p_events.assert_has_calls([
- mock.call(reply_ge_1, set(), None, set()),
- mock.call(reply_ge_2, set(['tap0']), None, set())
+ scan_ports.assert_has_calls([
+ mock.call(set(), True, set()),
+ mock.call(set(['tap0']), False, set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
- 'added': set(['tap0'])}, False),
+ 'added': set(['tap2'])}, False),
+ mock.call({'current': set(['tap2']),
+ 'removed': set(['tap0']),
+ 'added': set([])}, False)
])
cleanup.assert_called_once_with()