self.br_mgr.remove_empty_bridges()
return resync
- def scan_devices(self, registered_devices, updated_devices):
- curr_devices = self.br_mgr.get_tap_devices()
+ def scan_devices(self, previous, sync):
device_info = {}
- device_info['current'] = curr_devices
- device_info['added'] = curr_devices - registered_devices
- # we don't want to process updates for devices that don't exist
- device_info['updated'] = updated_devices & curr_devices
- # we need to clean up after devices are removed
- device_info['removed'] = registered_devices - curr_devices
+
+ # Save and reinitialise the set variable that the port_update RPC uses.
+ # This should be thread-safe as the greenthread should not yield
+ # between these two statements.
+ updated_devices = self.updated_devices
+ self.updated_devices = set()
+
+ current_devices = self.br_mgr.get_tap_devices()
+ device_info['current'] = current_devices
+
+ if previous is None:
+ # This is the first iteration of daemon_loop().
+ previous = {'added': set(),
+ 'current': set(),
+ 'updated': set(),
+ 'removed': set()}
+
+ if sync:
+ # This is the first iteration, or the previous one had a problem.
+ # Re-add all existing devices.
+ device_info['added'] = current_devices
+
+ # Retry cleaning devices that may not have been cleaned properly.
+ # And clean any that disappeared since the previous iteration.
+ device_info['removed'] = (previous['removed'] | previous['current']
+ - current_devices)
+
+ # Retry updating devices that may not have been updated properly.
+ # And any that were updated since the previous iteration.
+ # Only update devices that currently exist.
+ device_info['updated'] = (previous['updated'] | updated_devices
+ & current_devices)
+ else:
+ device_info['added'] = current_devices - previous['current']
+ device_info['removed'] = previous['current'] - current_devices
+ device_info['updated'] = updated_devices & current_devices
+
return device_info
def _device_info_has_changes(self, device_info):
or device_info.get('removed'))
def daemon_loop(self):
- sync = True
- devices = set()
-
LOG.info(_("LinuxBridge Agent RPC Daemon Started!"))
+ device_info = None
+ sync = True
while True:
start = time.time()
+
+ device_info = self.scan_devices(previous=device_info, sync=sync)
+
if sync:
LOG.info(_("Agent out of sync with plugin!"))
- devices.clear()
sync = False
- device_info = {}
- # Save updated devices dict to perform rollback in case
- # resync would be needed, and then clear self.updated_devices.
- # As the greenthread should not yield between these
- # two statements, this will should be thread-safe.
- updated_devices_copy = self.updated_devices
- self.updated_devices = set()
- try:
- device_info = self.scan_devices(devices, updated_devices_copy)
- if self._device_info_has_changes(device_info):
- LOG.debug(_("Agent loop found changes! %s"), device_info)
- # If treat devices fails - indicates must resync with
- # plugin
+
+ if self._device_info_has_changes(device_info):
+ LOG.debug(_("Agent loop found changes! %s"), device_info)
+ try:
sync = self.process_network_devices(device_info)
- devices = device_info['current']
- except Exception:
- LOG.exception(_("Error in agent loop. Devices info: %s"),
- device_info)
- sync = True
- # Restore devices that were removed from this set earlier
- # without overwriting ones that may have arrived since.
- self.updated_devices |= updated_devices_copy
+ except Exception:
+ LOG.exception(_("Error in agent loop. Devices info: %s"),
+ device_info)
+ sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
import mock
from oslo.config import cfg
-import testtools
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
- def test_loop_restores_updated_devices_on_exception(self):
- agent = self.agent
- agent.updated_devices = set(['tap1', 'tap2'])
-
- with contextlib.nested(
- mock.patch.object(agent, 'scan_devices'),
- mock.patch.object(linuxbridge_neutron_agent.LOG, 'info'),
- mock.patch.object(agent, 'process_network_devices')
- ) as (scan_devices, log, process_network_devices):
- # Simulate effect of 2 port_update()s when loop is running.
- # And break out of loop at start of 2nd iteration.
- log.side_effect = [agent.updated_devices.add('tap3'),
- agent.updated_devices.add('tap4'),
- ValueError]
- scan_devices.side_effect = RuntimeError
-
- with testtools.ExpectedException(ValueError):
- agent.daemon_loop()
-
- # Check that the originals {tap1,tap2} have been restored
- # and the new updates {tap3, tap4} have not been overwritten.
- self.assertEqual(set(['tap1', 'tap2', 'tap3', 'tap4']),
- agent.updated_devices)
- self.assertEqual(3, log.call_count)
-
- def mock_scan_devices(self, expected, mock_current,
- registered_devices, updated_devices):
+ def _test_scan_devices(self, previous, updated,
+ fake_current, expected, sync):
self.agent.br_mgr = mock.Mock()
- self.agent.br_mgr.get_tap_devices.return_value = mock_current
+ self.agent.br_mgr.get_tap_devices.return_value = fake_current
- results = self.agent.scan_devices(registered_devices, updated_devices)
+ self.agent.updated_devices = updated
+ results = self.agent.scan_devices(previous, sync)
self.assertEqual(expected, results)
- def test_scan_devices_returns_empty_sets(self):
- registered = set()
+ def test_scan_devices_no_changes(self):
+ previous = {'current': set([1, 2]),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set()}
+ fake_current = set([1, 2])
updated = set()
- mock_current = set()
- expected = {'current': set(),
+ expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
- self.mock_scan_devices(expected, mock_current, registered, updated)
- def test_scan_devices_no_changes(self):
- registered = set(['tap1', 'tap2'])
- updated = set()
- mock_current = set(['tap1', 'tap2'])
- expected = {'current': set(['tap1', 'tap2']),
+ self._test_scan_devices(previous, updated, fake_current, expected,
+ sync=False)
+
+ def test_scan_devices_added_removed(self):
+ previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
- self.mock_scan_devices(expected, mock_current, registered, updated)
+ fake_current = set([2, 3])
+ updated = set()
+ expected = {'current': set([2, 3]),
+ 'updated': set(),
+ 'added': set([3]),
+ 'removed': set([1])}
+
+ self._test_scan_devices(previous, updated, fake_current, expected,
+ sync=False)
- def test_scan_devices_new_and_removed(self):
- registered = set(['tap1', 'tap2'])
+ def test_scan_devices_removed_retried_on_sync(self):
+ previous = {'current': set([2, 3]),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set([1])}
+ fake_current = set([2, 3])
updated = set()
- mock_current = set(['tap2', 'tap3'])
- expected = {'current': set(['tap2', 'tap3']),
+ expected = {'current': set([2, 3]),
'updated': set(),
- 'added': set(['tap3']),
- 'removed': set(['tap1'])}
- self.mock_scan_devices(expected, mock_current, registered, updated)
-
- def test_scan_devices_new_updates(self):
- registered = set(['tap1'])
- updated = set(['tap2'])
- mock_current = set(['tap1', 'tap2'])
- expected = {'current': set(['tap1', 'tap2']),
- 'updated': set(['tap2']),
- 'added': set(['tap2']),
+ 'added': set([2, 3]),
+ 'removed': set([1])}
+
+ self._test_scan_devices(previous, updated, fake_current, expected,
+ sync=True)
+
+ def test_scan_devices_vanished_removed_on_sync(self):
+ previous = {'current': set([2, 3]),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set([1])}
+ # Device 2 disappeared.
+ fake_current = set([3])
+ updated = set()
+ # Device 1 should be retried.
+ expected = {'current': set([3]),
+ 'updated': set(),
+ 'added': set([3]),
+ 'removed': set([1, 2])}
+
+ self._test_scan_devices(previous, updated, fake_current, expected,
+ sync=True)
+
+ def test_scan_devices_updated(self):
+ previous = {'current': set([1, 2]),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set()}
+ fake_current = set([1, 2])
+ updated = set([1])
+ expected = {'current': set([1, 2]),
+ 'updated': set([1]),
+ 'added': set(),
'removed': set()}
- self.mock_scan_devices(expected, mock_current, registered, updated)
- def test_scan_devices_updated_missing(self):
- registered = set(['tap1'])
- updated = set(['tap2'])
- mock_current = set(['tap1'])
- expected = {'current': set(['tap1']),
+ self._test_scan_devices(previous, updated, fake_current, expected,
+ sync=False)
+
+ def test_scan_devices_updated_non_existing(self):
+ previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set()}
- self.mock_scan_devices(expected, mock_current, registered, updated)
+ fake_current = set([1, 2])
+ updated = set([3])
+ expected = {'current': set([1, 2]),
+ 'updated': set(),
+ 'added': set(),
+ 'removed': set()}
+
+ self._test_scan_devices(previous, updated, fake_current, expected,
+ sync=False)
+
+ def test_scan_devices_updated_on_sync(self):
+ previous = {'current': set([1, 2]),
+ 'updated': set([1]),
+ 'added': set(),
+ 'removed': set()}
+ fake_current = set([1, 2])
+ updated = set([2])
+ expected = {'current': set([1, 2]),
+ 'updated': set([1, 2]),
+ 'added': set([1, 2]),
+ 'removed': set()}
+
+ self._test_scan_devices(previous, updated, fake_current, expected,
+ sync=True)
def test_process_network_devices(self):
agent = self.agent