import eventlet
from oslo_log import log as logging
+from oslo_serialization import jsonutils
from neutron.agent.linux import async_process
+from neutron.agent.ovsdb import api as ovsdb
from neutron.i18n import _LE
LOG = logging.getLogger(__name__)
+OVSDB_ACTION_INITIAL = 'initial'
+OVSDB_ACTION_INSERT = 'insert'
+OVSDB_ACTION_DELETE = 'delete'
+
class OvsdbMonitor(async_process.AsyncProcess):
"""Manages an invocation of 'ovsdb-client monitor'."""
def __init__(self, respawn_interval=None):
super(SimpleInterfaceMonitor, self).__init__(
'Interface',
- columns=['name', 'ofport'],
+ columns=['name', 'ofport', 'external_ids'],
format='json',
respawn_interval=respawn_interval,
)
self.data_received = False
+ self.new_events = {'added': [], 'removed': []}
@property
def has_updates(self):
"""Indicate whether the ovsdb Interface table has been updated.
- True will be returned if the monitor process is not active.
- This 'failing open' minimizes the risk of falsely indicating
- the absence of updates at the expense of potential false
- positives.
+ If the monitor process is not active an error will be logged since
+ it won't be able to communicate any update. This situation should be
+ temporary if respawn_interval is set.
"""
- return bool(list(self.iter_stdout())) or not self.is_active()
+ if not self.is_active():
+ LOG.error(_LE("Interface monitor is not active"))
+ else:
+ self.process_events()
+ return bool(self.new_events['added'] or self.new_events['removed'])
+
+ def get_events(self):
+ self.process_events()
+ events = self.new_events
+ self.new_events = {'added': [], 'removed': []}
+ return events
+
+ def process_events(self):
+ devices_added = []
+ devices_removed = []
+ for row in self.iter_stdout():
+ json = jsonutils.loads(row).get('data')
+ for ovs_id, action, name, ofport, external_ids in json:
+ if external_ids:
+ external_ids = ovsdb.val_to_py(external_ids)
+ device = {'name': name,
+ 'ofport': ofport,
+ 'external_ids': external_ids}
+ if action in (OVSDB_ACTION_INITIAL, OVSDB_ACTION_INSERT):
+ devices_added.append(device)
+ elif action == OVSDB_ACTION_DELETE:
+ devices_removed.append(device)
+ self.new_events['added'].extend(devices_added)
+ self.new_events['removed'].extend(devices_removed)
def start(self, block=False, timeout=5):
super(SimpleInterfaceMonitor, self).start()
# collect output.
eventlet.sleep()
return self._monitor.has_updates
+
+ def get_events(self):
+ return self._monitor.get_events()
class OVSPortFixture(PortFixture):
+ def __init__(self, bridge=None, namespace=None, attrs=None):
+ super(OVSPortFixture, self).__init__(bridge, namespace)
+ if attrs is None:
+ attrs = []
+ self.attrs = attrs
+
def _create_bridge_fixture(self):
return OVSBridgeFixture()
self.port.link.set_up()
def create_port(self, name):
- self.bridge.add_port(name, ('type', 'internal'))
+ self.attrs.insert(0, ('type', 'internal'))
+ self.bridge.add_port(name, *self.attrs)
return name
utils.wait_until_true(lambda: self.monitor.data_received is True)
self.assertTrue(self.monitor.has_updates,
'Initial call should always be true')
- self.assertFalse(self.monitor.has_updates,
- 'has_updates without port addition should be False')
+ # clear the event list
+ self.monitor.get_events()
self.useFixture(net_helpers.OVSPortFixture())
# has_updates after port addition should become True
- while not self.monitor.has_updates:
- eventlet.sleep(0.01)
+ utils.wait_until_true(lambda: self.monitor.has_updates is True)
+
+ def _expected_devices_events(self, devices, state):
+ """Helper to check that events are received for expected devices.
+
+ :param devices: The list of expected devices. WARNING: This list
+ is modified by this method
+ :param state: The state of the devices (added or removed)
+ """
+ events = self.monitor.get_events()
+ event_devices = [
+ (dev['name'], dev['external_ids']) for dev in events.get(state)]
+ for dev in event_devices:
+ if dev[0] in devices:
+ devices.remove(dev[0])
+ self.assertEqual(dev[1].get('iface-status'), 'active')
+ if not devices:
+ return True
+
+ def test_get_events(self):
+ utils.wait_until_true(lambda: self.monitor.data_received is True)
+ devices = self.monitor.get_events()
+ self.assertTrue(devices.get('added'),
+ 'Initial call should always be true')
+ p_attrs = [('external_ids', {'iface-status': 'active'})]
+ br = self.useFixture(net_helpers.OVSBridgeFixture())
+ p1 = self.useFixture(net_helpers.OVSPortFixture(
+ br.bridge, None, p_attrs))
+ p2 = self.useFixture(net_helpers.OVSPortFixture(
+ br.bridge, None, p_attrs))
+ added_devices = [p1.port.name, p2.port.name]
+ utils.wait_until_true(
+ lambda: self._expected_devices_events(added_devices, 'added'))
+ br.bridge.delete_port(p1.port.name)
+ br.bridge.delete_port(p2.port.name)
+ removed_devices = [p1.port.name, p2.port.name]
+ utils.wait_until_true(
+ lambda: self._expected_devices_events(removed_devices, 'removed'))
+ # restart
+ self.monitor.stop(block=True)
+ self.monitor.start(block=True, timeout=60)
+ devices = self.monitor.get_events()
+ self.assertTrue(devices.get('added'),
+ 'Initial call should always be true')
super(TestSimpleInterfaceMonitor, self).setUp()
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor()
- def test_has_updates_is_true_by_default(self):
- self.assertTrue(self.monitor.has_updates)
-
def test_has_updates_is_false_if_active_with_no_output(self):
target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
'.is_active')
return_value=output):
self.monitor._read_stdout()
self.assertFalse(self.monitor.data_received)
+
+ def test_has_updates_after_calling_get_events_is_false(self):
+ with mock.patch.object(
+ self.monitor, 'process_events') as process_events:
+ self.monitor.new_events = {'added': ['foo'], 'removed': ['foo1']}
+ self.assertTrue(self.monitor.has_updates)
+ self.monitor.get_events()
+ self.assertTrue(process_events.called)
+ self.assertFalse(self.monitor.has_updates)