from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent.l2.extensions import manager as ext_manager
+from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
+from neutron.agent.linux import polling as linux_polling
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
agent_conf = self.conf.AGENT
ovs_conf = self.conf.OVS
- self.fullsync = True
+ self.fullsync = False
# init bridge classes with configured datapath type.
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
functools.partial(bridge_classes[b],
port_info['removed'] = registered_ports - cur_ports
return port_info
+ def process_ports_events(self, events, registered_ports, ancillary_ports,
+ old_ports_not_ready, updated_ports=None):
+ port_info = {}
+ port_info['added'] = set()
+ port_info['removed'] = set()
+ port_info['current'] = registered_ports
+
+ ancillary_port_info = {}
+ ancillary_port_info['added'] = set()
+ ancillary_port_info['removed'] = set()
+ ancillary_port_info['current'] = ancillary_ports
+ ports_not_ready_yet = set()
+
+ # if a port was added and then removed or viceversa since the agent
+ # can't know the order of the operations, check the status of the port
+ # to determine if the port was added or deleted
+ ports_removed_and_added = [
+ p for p in events['added'] if p in events['removed']]
+ for p in ports_removed_and_added:
+ if ovs_lib.BaseOVS().port_exists(p['name']):
+ events['removed'].remove(p)
+ else:
+ events['added'].remove(p)
+
+ #TODO(rossella_s): scanning the ancillary bridge won't be needed
+ # anymore when https://review.openstack.org/#/c/203381 since the bridge
+ # id stored in external_ids will be used to identify the bridge the
+ # port belongs to
+ cur_ancillary_ports = set()
+ for bridge in self.ancillary_brs:
+ cur_ancillary_ports |= bridge.get_vif_port_set()
+ cur_ancillary_ports |= ancillary_port_info['current']
+
+ def _process_port(port, ports, ancillary_ports):
+ # check 'iface-id' is set otherwise is not a port
+ # the agent should care about
+ if 'attached-mac' in port.get('external_ids', []):
+ iface_id = self.int_br.portid_from_external_ids(
+ port['external_ids'])
+ if iface_id:
+ if port['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
+ LOG.debug("Port %s not ready yet on the bridge",
+ iface_id)
+ ports_not_ready_yet.add(port['name'])
+ return
+ # check if port belongs to ancillary bridge
+ if iface_id in cur_ancillary_ports:
+ ancillary_ports.add(iface_id)
+ else:
+ ports.add(iface_id)
+ if old_ports_not_ready:
+ old_ports_not_ready_attrs = self.int_br.get_ports_attributes(
+ 'Interface', columns=['name', 'external_ids', 'ofport'],
+ ports=old_ports_not_ready, if_exists=True)
+ now_ready_ports = set(
+ [p['name'] for p in old_ports_not_ready_attrs])
+ LOG.debug("Ports %s are now ready", now_ready_ports)
+ old_ports_not_ready_yet = old_ports_not_ready - now_ready_ports
+ removed_ports = set([p['name'] for p in events['removed']])
+ old_ports_not_ready_yet -= removed_ports
+ LOG.debug("Ports %s were not ready at last iteration and are not "
+ "ready yet", old_ports_not_ready_yet)
+ ports_not_ready_yet |= old_ports_not_ready_yet
+ events['added'].extend(old_ports_not_ready_attrs)
+
+ for port in events['added']:
+ _process_port(port, port_info['added'],
+ ancillary_port_info['added'])
+ for port in events['removed']:
+ _process_port(port, port_info['removed'],
+ ancillary_port_info['removed'])
+
+ if updated_ports is None:
+ updated_ports = set()
+ updated_ports.update(self.check_changed_vlans())
+
+ # Disregard devices that were never noticed by the agent
+ port_info['removed'] &= port_info['current']
+ port_info['current'] |= port_info['added']
+ port_info['current'] -= port_info['removed']
+
+ ancillary_port_info['removed'] &= ancillary_port_info['current']
+ ancillary_port_info['current'] |= ancillary_port_info['added']
+ ancillary_port_info['current'] -= ancillary_port_info['removed']
+
+ if updated_ports:
+ # Some updated ports might have been removed in the
+ # meanwhile, and therefore should not be processed.
+ # In this case the updated port won't be found among
+ # current ports.
+ updated_ports &= port_info['current']
+ port_info['updated'] = updated_ports
+ return port_info, ancillary_port_info, ports_not_ready_yet
+
def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
bridge.cleanup_flows()
+ def process_port_info(self, start, polling_manager, sync, ovs_restarted,
+ ports, ancillary_ports, updated_ports_copy,
+ consecutive_resyncs, ports_not_ready_yet):
+ # There are polling managers that don't have get_events, e.g.
+ # AlwaysPoll used by windows implementations
+ # REVISIT (rossella_s) This needs to be reworked to hide implementation
+ # details regarding polling in BasePollingManager subclasses
+ if sync or not (hasattr(polling_manager, 'get_events')):
+ if sync:
+ LOG.info(_LI("Agent out of sync with plugin!"))
+ consecutive_resyncs = consecutive_resyncs + 1
+ if (consecutive_resyncs >=
+ constants.MAX_DEVICE_RETRIES):
+ LOG.warn(_LW(
+ "Clearing cache of registered ports,"
+ " retries to resync were > %s"),
+ constants.MAX_DEVICE_RETRIES)
+ ports.clear()
+ ancillary_ports.clear()
+ consecutive_resyncs = 0
+ else:
+ consecutive_resyncs = 0
+
+ # NOTE(rossella_s) don't empty the queue of events
+ # calling polling_manager.get_events() since
+ # the agent might miss some event (for example a port
+ # deletion)
+ reg_ports = (set() if ovs_restarted else ports)
+ port_info = self.scan_ports(reg_ports, sync,
+ updated_ports_copy)
+ # Treat ancillary devices if they exist
+ if self.ancillary_brs:
+ ancillary_port_info = self.scan_ancillary_ports(
+ ancillary_ports, sync)
+ LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
+ " - ancillary port info retrieved. "
+ "Elapsed:%(elapsed).3f",
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ else:
+ ancillary_port_info = {}
+
+ else:
+ consecutive_resyncs = 0
+ events = polling_manager.get_events()
+ port_info, ancillary_port_info, ports_not_ready_yet = (
+ self.process_ports_events(events, ports, ancillary_ports,
+ ports_not_ready_yet,
+ updated_ports_copy))
+ return (port_info, ancillary_port_info, consecutive_resyncs,
+ ports_not_ready_yet)
+
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
minimize_polling=False)
- sync = True
+
+ sync = False
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
ovs_restarted = False
consecutive_resyncs = 0
need_clean_stale_flow = True
+ ports_not_ready_yet = set()
while self._check_and_handle_signal():
if self.fullsync:
LOG.info(_LI("rpc_loop doing a full sync."))
start = time.time()
LOG.debug("Agent rpc_loop - iteration:%d started",
self.iter_num)
- if sync:
- LOG.info(_LI("Agent out of sync with plugin!"))
- polling_manager.force_polling()
- consecutive_resyncs = consecutive_resyncs + 1
- if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
- LOG.warn(_LW("Clearing cache of registered ports, retrials"
- " to resync were > %s"),
- constants.MAX_DEVICE_RETRIES)
- ports.clear()
- ancillary_ports.clear()
- sync = False
- consecutive_resyncs = 0
- else:
- consecutive_resyncs = 0
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows()
+ # restart the polling manager so that it will signal as added
+ # all the current ports
+ # REVISIT (rossella_s) Define a method "reset" in
+ # BasePollingManager that will be implemented by AlwaysPoll as
+ # no action and by InterfacePollingMinimizer as start/stop
+ if isinstance(
+ polling_manager, linux_polling.InterfacePollingMinimizer):
+ # There's a possible race here, when ovsdb-server is
+ # restarted ovsdb monitor will also be restarted
+ try:
+ polling_manager.stop()
+ except async_process.AsyncProcessException:
+ LOG.debug("OVSDB monitor was not running")
+ polling_manager.start()
elif ovs_status == constants.OVS_DEAD:
# Agent doesn't apply any operations when ovs is dead, to
# prevent unexpected failure or crash. Sleep and continue
LOG.exception(_LE("Error while synchronizing tunnels"))
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
- if self._agent_has_updates(polling_manager) or ovs_restarted:
+ if (self._agent_has_updates(polling_manager) or sync
+ or ports_not_ready_yet):
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
- reg_ports = (set() if ovs_restarted else ports)
- port_info = self.scan_ports(reg_ports, sync,
- updated_ports_copy)
+ (port_info, ancillary_port_info, consecutive_resyncs,
+ ports_not_ready_yet) = (self.process_port_info(
+ start, polling_manager, sync, ovs_restarted,
+ ports, ancillary_ports, updated_ports_copy,
+ consecutive_resyncs, ports_not_ready_yet)
+ )
+ sync = False
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
- # Treat ancillary devices if they exist
- if self.ancillary_brs:
- ancillary_port_info = self.scan_ancillary_ports(
- ancillary_ports, sync)
- LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
- "ancillary port info retrieved. "
- "Elapsed:%(elapsed).3f",
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- sync = False
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
if tunnel_types:
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
agent.sg_agent = mock.Mock()
+ agent.ancillary_brs = []
return agent
- def start_agent(self, agent, unplug_ports=None):
+ def _mock_get_events(self, agent, polling_manager, ports):
+ get_events = polling_manager.get_events
+ p_ids = [p['id'] for p in ports]
+
+ def filter_events():
+ events = get_events()
+ filtered_ports = []
+ for dev in events['added']:
+ iface_id = agent.int_br.portid_from_external_ids(
+ dev.get('external_ids', []))
+ if iface_id in p_ids:
+ # if the event is not about a port that was created by
+ # this test, we filter the event out. Since these tests are
+ # not run in isolation processing all the events might make
+ # some test fail ( e.g. the agent might keep resycing
+ # because it keeps finding not ready ports that are created
+ # by other tests)
+ filtered_ports.append(dev)
+ return {'added': filtered_ports, 'removed': events['removed']}
+ polling_manager.get_events = mock.Mock(side_effect=filter_events)
+
+ def start_agent(self, agent, ports=None, unplug_ports=None):
if unplug_ports is None:
unplug_ports = []
+ if ports is None:
+ ports = []
self.setup_agent_rpc_mocks(agent, unplug_ports)
polling_manager = polling.InterfacePollingMinimizer()
+ self._mock_get_events(agent, polling_manager, ports)
self.addCleanup(polling_manager.stop)
polling_manager.start()
agent_utils.wait_until_true(
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
+ return polling_manager
def _create_test_port_dict(self):
return {'id': uuidutils.generate_uuid(),
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
trigger_resync=False):
+ self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels)
- self.start_agent(self.agent)
+ self.polling_manager = self.start_agent(self.agent, ports=self.ports)
self.network = self._create_test_network_dict()
- self.ports = port_dicts
if trigger_resync:
self._prepare_resync_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent)
self.agent = self.create_agent(create_tunnels=False)
self.network = self._create_test_network_dict()
self._plug_ports(self.network, self.ports, self.agent)
- self.start_agent(self.agent, unplug_ports=[self.ports[1]])
+ self.start_agent(self.agent, ports=self.ports,
+ unplug_ports=[self.ports[1]])
self.wait_until_ports_state([self.ports[0]], up=True)
self.assertRaises(
Timeout, self.wait_until_ports_state, [self.ports[1]], up=True,
updated_ports)
self.assertEqual(expected, actual)
+ def _test_process_ports_events(self, events, registered_ports,
+ ancillary_ports, expected_ports,
+ expected_ancillary, updated_ports=None):
+ with mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+ devices_not_ready_yet = set()
+ actual = self.agent.process_ports_events(
+ events, registered_ports, ancillary_ports,
+ devices_not_ready_yet, updated_ports)
+ self.assertEqual(
+ (expected_ports, expected_ancillary, devices_not_ready_yet),
+ actual)
+
+ def test_process_ports_events_returns_current_for_unchanged_ports(self):
+ events = {'added': [], 'removed': []}
+ registered_ports = {1, 3}
+ ancillary_ports = {2, 5}
+ expected_ports = {'current': registered_ports, 'added': set(),
+ 'removed': set()}
+ expected_ancillary = {'current': ancillary_ports, 'added': set(),
+ 'removed': set()}
+ self._test_process_ports_events(events, registered_ports,
+ ancillary_ports, expected_ports,
+ expected_ancillary)
+
+ def test_process_port_events_no_vif_changes_return_updated_port_only(self):
+ events = {'added': [], 'removed': []}
+ registered_ports = {1, 2, 3}
+ updated_ports = {2}
+ expected_ports = dict(current=registered_ports, updated={2},
+ added=set(), removed=set())
+ expected_ancillary = dict(current=set(), added=set(), removed=set())
+ self._test_process_ports_events(events, registered_ports,
+ set(), expected_ports,
+ expected_ancillary, updated_ports)
+
+ def test_process_port_events_ignores_removed_port_if_never_added(self):
+ events = {'added': [],
+ 'removed': [{'name': 'port2', 'ofport': 2,
+ 'external_ids': {'attached-mac': 'test-mac'}}]}
+ registered_ports = {1}
+ expected_ports = dict(current=registered_ports, added=set(),
+ removed=set())
+ expected_ancillary = dict(current=set(), added=set(), removed=set())
+ devices_not_ready_yet = set()
+ with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+ side_effect=[2]), \
+ mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+ actual = self.agent.process_ports_events(
+ events, registered_ports, set(), devices_not_ready_yet)
+ self.assertEqual(
+ (expected_ports, expected_ancillary, devices_not_ready_yet),
+ actual)
+
+ def test_process_port_events_port_not_ready_yet(self):
+ events = {'added': [{'name': 'port5', 'ofport': [],
+ 'external_ids': {'attached-mac': 'test-mac'}}],
+ 'removed': []}
+ old_devices_not_ready = {'port4'}
+ registered_ports = set([1, 2, 3])
+ expected_ports = dict(current=set([1, 2, 3, 4]),
+ added=set([4]), removed=set())
+ self.agent.ancillary_brs = []
+ expected_ancillary = dict(current=set(), added=set(), removed=set())
+ with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+ side_effect=[5, 4]), \
+ mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()), \
+ mock.patch.object(self.agent.int_br, 'get_ports_attributes',
+ return_value=[{'name': 'port4', 'ofport': 4,
+ 'external_ids': {
+ 'attached-mac': 'mac4'}}]):
+ expected_devices_not_ready = {'port5'}
+ actual = self.agent.process_ports_events(
+ events, registered_ports, set(), old_devices_not_ready)
+ self.assertEqual(
+ (expected_ports, expected_ancillary,
+ expected_devices_not_ready),
+ actual)
+
+ def _test_process_port_events_with_updated_ports(self, updated_ports):
+ events = {'added': [{'name': 'port3', 'ofport': 3,
+ 'external_ids': {'attached-mac': 'test-mac'}},
+ {'name': 'qg-port2', 'ofport': 6,
+ 'external_ids': {'attached-mac': 'test-mac'}}],
+ 'removed': [{'name': 'port2', 'ofport': 2,
+ 'external_ids': {'attached-mac': 'test-mac'}},
+ {'name': 'qg-port1', 'ofport': 5,
+ 'external_ids': {'attached-mac': 'test-mac'}}]}
+ registered_ports = {1, 2, 4}
+ ancillary_ports = {5, 8}
+ expected_ports = dict(current={1, 3, 4}, added={3}, removed={2})
+ if updated_ports:
+ expected_ports['updated'] = updated_ports
+ expected_ancillary = dict(current={6, 8}, added={6},
+ removed={5})
+ ancillary_bridge = mock.Mock()
+ ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
+ self.agent.ancillary_brs = [ancillary_bridge]
+ with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
+ side_effect=[3, 6, 2, 5]), \
+ mock.patch.object(self.agent, 'check_changed_vlans',
+ return_value=set()):
+
+ devices_not_ready_yet = set()
+ actual = self.agent.process_ports_events(
+ events, registered_ports, ancillary_ports,
+ devices_not_ready_yet, updated_ports)
+ self.assertEqual(
+ (expected_ports, expected_ancillary, devices_not_ready_yet),
+ actual)
+
+ def test_process_port_events_returns_port_changes(self):
+ self._test_process_port_events_with_updated_ports(set())
+
+ def test_process_port_events_finds_known_updated_ports(self):
+ self._test_process_port_events_with_updated_ports({4})
+
+ def test_process_port_events_ignores_unknown_updated_ports(self):
+ # the port '10' was not seen on current ports. Hence it has either
+ # never been wired or already removed and should be ignored
+ self._test_process_port_events_with_updated_ports({4, 10})
+
+ def test_process_port_events_ignores_updated_port_if_removed(self):
+ self._test_process_port_events_with_updated_ports({4, 5})
+
def test_update_ports_returns_changed_vlan(self):
br = self.br_int_cls('br-int')
mac = "ca:fe:de:ad:be:ef"
'added': set([]),
'removed': set(['tap0'])}
+ reply_ancillary = {'current': set([]),
+ 'added': set([]),
+ 'removed': set([])}
+
with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
+ mock.patch.object(async_process.AsyncProcess, "start"),\
+ mock.patch.object(async_process.AsyncProcess, "stop"),\
mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
- 'scan_ports') as scan_ports,\
+ 'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
- scan_ports.side_effect = [reply2, reply3]
+ devices_not_ready = set()
+ process_p_events.side_effect = [(reply2, reply_ancillary,
+ devices_not_ready),
+ (reply3, reply_ancillary,
+ devices_not_ready)]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
check_ovs_status.side_effect = args
except Exception:
pass
- scan_ports.assert_has_calls([
- mock.call(set(), True, set()),
- mock.call(set(), False, set())
+ process_p_events.assert_has_calls([
+ mock.call({'removed': [], 'added': []}, set(), set(), set(),
+ set()),
+ mock.call({'removed': [], 'added': []}, set(['tap0']), set(),
+ set(), set())
])
+
process_network_ports.assert_has_calls([
mock.call(reply2, False),
mock.call(reply3, True)
self._verify_mock_calls()
def test_daemon_loop(self):
- reply2 = {'current': set(['tap0']),
- 'added': set(['tap2']),
- 'removed': set([])}
+ reply_ge_1 = {'added': set(['tap0']),
+ 'removed': set([])}
- reply3 = {'current': set(['tap2']),
- 'added': set([]),
+ reply_ge_2 = {'added': set([]),
'removed': set(['tap0'])}
+ reply_pe_1 = {'current': set(['tap0']),
+ 'added': set(['tap0']),
+ 'removed': set([])}
+
+ reply_pe_2 = {'current': set([]),
+ 'added': set([]),
+ 'removed': set(['tap0'])}
+
+ reply_ancillary = {'current': set([]),
+ 'added': set([]),
+ 'removed': set([])}
+
self.mock_int_bridge_expected += [
mock.call.check_canary_table(),
mock.call.check_canary_table()
with mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
- 'scan_ports') as scan_ports,\
+ 'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
- scan_ports.side_effect = [reply2, reply3]
update_stale.return_value = []
+ devices_not_ready = set()
+ process_p_events.side_effect = [
+ (reply_pe_1, reply_ancillary, devices_not_ready),
+ (reply_pe_2, reply_ancillary, devices_not_ready)]
+ interface_polling = mock.Mock()
+ interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
# We start method and expect it will raise after 2nd loop
# If something goes wrong, assert_has_calls below will catch it
try:
- n_agent.daemon_loop()
+ n_agent.rpc_loop(interface_polling)
except Exception:
pass
# messages
log_exception.assert_called_once_with(
"Error while processing VIF ports")
- scan_ports.assert_has_calls([
- mock.call(set(), True, set()),
- mock.call(set(['tap0']), False, set())
+ process_p_events.assert_has_calls([
+ mock.call(reply_ge_1, set(), set(), devices_not_ready, set()),
+ mock.call(reply_ge_2, set(['tap0']), set(), devices_not_ready,
+ set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
- 'added': set(['tap2'])}, False),
- mock.call({'current': set(['tap2']),
- 'removed': set(['tap0']),
- 'added': set([])}, False)
+ 'added': set(['tap0'])}, False),
])
cleanup.assert_called_once_with()