]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add get_events to OVSDB monitor
authorrossella <rsblendido@suse.com>
Thu, 5 Mar 2015 09:24:10 +0000 (09:24 +0000)
committerrossella <rsblendido@suse.com>
Mon, 8 Jun 2015 13:39:20 +0000 (13:39 +0000)
OVSDB monitor can generate the events that the OVS agent
needs to process (device added or updated). Instead of
notifying only that a change occurred and that polling
is needed, pass the events to the agent

Change-Id: I3d17bf995ad4508c4c6d089de550148da1465fa1
Partially-Implements: blueprint restructure-l2-agent

neutron/agent/linux/ovsdb_monitor.py
neutron/agent/linux/polling.py
neutron/tests/common/net_helpers.py
neutron/tests/functional/agent/linux/test_ovsdb_monitor.py
neutron/tests/unit/agent/linux/test_ovsdb_monitor.py

index 7e0ef251184e2d5dc52bbb511ede307b93f97f42..f992bca25b5932b0b3c7b94a623a994880fd827d 100644 (file)
 
 import eventlet
 from oslo_log import log as logging
+from oslo_serialization import jsonutils
 
 from neutron.agent.linux import async_process
+from neutron.agent.ovsdb import api as ovsdb
 from neutron.i18n import _LE
 
 
 LOG = logging.getLogger(__name__)
 
+OVSDB_ACTION_INITIAL = 'initial'
+OVSDB_ACTION_INSERT = 'insert'
+OVSDB_ACTION_DELETE = 'delete'
+
 
 class OvsdbMonitor(async_process.AsyncProcess):
     """Manages an invocation of 'ovsdb-client monitor'."""
@@ -63,22 +69,50 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
     def __init__(self, respawn_interval=None):
         super(SimpleInterfaceMonitor, self).__init__(
             'Interface',
-            columns=['name', 'ofport'],
+            columns=['name', 'ofport', 'external_ids'],
             format='json',
             respawn_interval=respawn_interval,
         )
         self.data_received = False
+        self.new_events = {'added': [], 'removed': []}
 
     @property
     def has_updates(self):
         """Indicate whether the ovsdb Interface table has been updated.
 
-        True will be returned if the monitor process is not active.
-        This 'failing open' minimizes the risk of falsely indicating
-        the absence of updates at the expense of potential false
-        positives.
+        If the monitor process is not active an error will be logged since
+        it won't be able to communicate any update. This situation should be
+        temporary if respawn_interval is set.
         """
-        return bool(list(self.iter_stdout())) or not self.is_active()
+        if not self.is_active():
+            LOG.error(_LE("Interface monitor is not active"))
+        else:
+            self.process_events()
+        return bool(self.new_events['added'] or self.new_events['removed'])
+
+    def get_events(self):
+        self.process_events()
+        events = self.new_events
+        self.new_events = {'added': [], 'removed': []}
+        return events
+
+    def process_events(self):
+        devices_added = []
+        devices_removed = []
+        for row in self.iter_stdout():
+            json = jsonutils.loads(row).get('data')
+            for ovs_id, action, name, ofport, external_ids in json:
+                if external_ids:
+                    external_ids = ovsdb.val_to_py(external_ids)
+                device = {'name': name,
+                          'ofport': ofport,
+                          'external_ids': external_ids}
+                if action in (OVSDB_ACTION_INITIAL, OVSDB_ACTION_INSERT):
+                    devices_added.append(device)
+                elif action == OVSDB_ACTION_DELETE:
+                    devices_removed.append(device)
+        self.new_events['added'].extend(devices_added)
+        self.new_events['removed'].extend(devices_removed)
 
     def start(self, block=False, timeout=5):
         super(SimpleInterfaceMonitor, self).start()
index dffabf34030b01e47b7e3cbfce0188b0ed22bc76..ac3a4a620c2e684b6d296cc45d1a88c6198f7f24 100644 (file)
@@ -60,3 +60,6 @@ class InterfacePollingMinimizer(base_polling.BasePollingManager):
         # collect output.
         eventlet.sleep()
         return self._monitor.has_updates
+
+    def get_events(self):
+        return self._monitor.get_events()
index 5d665f7f9cee91c99ce9c995e30172e2bfbf625d..ae494f5f358b11d979831d6cd52bd79b24c3973a 100644 (file)
@@ -181,6 +181,12 @@ class OVSBridgeFixture(fixtures.Fixture):
 
 class OVSPortFixture(PortFixture):
 
+    def __init__(self, bridge=None, namespace=None, attrs=None):
+        super(OVSPortFixture, self).__init__(bridge, namespace)
+        if attrs is None:
+            attrs = []
+        self.attrs = attrs
+
     def _create_bridge_fixture(self):
         return OVSBridgeFixture()
 
@@ -196,7 +202,8 @@ class OVSPortFixture(PortFixture):
         self.port.link.set_up()
 
     def create_port(self, name):
-        self.bridge.add_port(name, ('type', 'internal'))
+        self.attrs.insert(0, ('type', 'internal'))
+        self.bridge.add_port(name, *self.attrs)
         return name
 
 
index a9ae8c2365ebe9eada5992e77a350e6a0296a951..fc49b1ae4d199c8322a9367f45d30caf342a700d 100644 (file)
@@ -107,9 +107,51 @@ class TestSimpleInterfaceMonitor(BaseMonitorTest):
         utils.wait_until_true(lambda: self.monitor.data_received is True)
         self.assertTrue(self.monitor.has_updates,
                         'Initial call should always be true')
-        self.assertFalse(self.monitor.has_updates,
-                         'has_updates without port addition should be False')
+        # clear the event list
+        self.monitor.get_events()
         self.useFixture(net_helpers.OVSPortFixture())
         # has_updates after port addition should become True
-        while not self.monitor.has_updates:
-            eventlet.sleep(0.01)
+        utils.wait_until_true(lambda: self.monitor.has_updates is True)
+
+    def _expected_devices_events(self, devices, state):
+        """Helper to check that events are received for expected devices.
+
+        :param devices: The list of expected devices. WARNING: This list
+          is modified by this method
+        :param state: The state of the devices (added or removed)
+        """
+        events = self.monitor.get_events()
+        event_devices = [
+            (dev['name'], dev['external_ids']) for dev in events.get(state)]
+        for dev in event_devices:
+            if dev[0] in devices:
+                devices.remove(dev[0])
+                self.assertEqual(dev[1].get('iface-status'), 'active')
+            if not devices:
+                return True
+
+    def test_get_events(self):
+        utils.wait_until_true(lambda: self.monitor.data_received is True)
+        devices = self.monitor.get_events()
+        self.assertTrue(devices.get('added'),
+                        'Initial call should always be true')
+        p_attrs = [('external_ids', {'iface-status': 'active'})]
+        br = self.useFixture(net_helpers.OVSBridgeFixture())
+        p1 = self.useFixture(net_helpers.OVSPortFixture(
+            br.bridge, None, p_attrs))
+        p2 = self.useFixture(net_helpers.OVSPortFixture(
+            br.bridge, None, p_attrs))
+        added_devices = [p1.port.name, p2.port.name]
+        utils.wait_until_true(
+            lambda: self._expected_devices_events(added_devices, 'added'))
+        br.bridge.delete_port(p1.port.name)
+        br.bridge.delete_port(p2.port.name)
+        removed_devices = [p1.port.name, p2.port.name]
+        utils.wait_until_true(
+            lambda: self._expected_devices_events(removed_devices, 'removed'))
+        # restart
+        self.monitor.stop(block=True)
+        self.monitor.start(block=True, timeout=60)
+        devices = self.monitor.get_events()
+        self.assertTrue(devices.get('added'),
+                        'Initial call should always be true')
index 9b8b97687060f31cc9579ea6736abadc50aeaa01..604d6cc4ad86a729284e3e4e18cf412bc728ce7f 100644 (file)
@@ -55,9 +55,6 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase):
         super(TestSimpleInterfaceMonitor, self).setUp()
         self.monitor = ovsdb_monitor.SimpleInterfaceMonitor()
 
-    def test_has_updates_is_true_by_default(self):
-        self.assertTrue(self.monitor.has_updates)
-
     def test_has_updates_is_false_if_active_with_no_output(self):
         target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
                   '.is_active')
@@ -87,3 +84,12 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase):
                 return_value=output):
             self.monitor._read_stdout()
         self.assertFalse(self.monitor.data_received)
+
+    def test_has_updates_after_calling_get_events_is_false(self):
+        with mock.patch.object(
+                self.monitor, 'process_events') as process_events:
+            self.monitor.new_events = {'added': ['foo'], 'removed': ['foo1']}
+            self.assertTrue(self.monitor.has_updates)
+            self.monitor.get_events()
+            self.assertTrue(process_events.called)
+            self.assertFalse(self.monitor.has_updates)