]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Introduce ip address monitor
authorAssaf Muller <amuller@redhat.com>
Sun, 22 Feb 2015 00:28:54 +0000 (19:28 -0500)
committerAssaf Muller <amuller@redhat.com>
Wed, 4 Mar 2015 01:06:14 +0000 (20:06 -0500)
In Juno, we used keepalived notifier scripts to report the local
state of an HA router's state. These have been found to be
unreliable. The proposed approach is to not use them altogether.
Instead, monitor the omnipresent VIP on the HA device - It is
only configured on the master instance. In order to do that,
we'll use the 'ip monitor address' wrapper introduced in this patch
to get address change events as they happen to avoid polling.

Related-Bug: #1402010
Change-Id: Icc2c07efb7e20008ff5b07d7df2104e6099091d7

etc/neutron/rootwrap.d/l3.filters
neutron/agent/linux/async_process.py
neutron/agent/linux/ip_monitor.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/test_async_process.py
neutron/tests/functional/agent/linux/test_ip_monitor.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/test_utils.py
neutron/tests/unit/agent/linux/test_async_process.py
neutron/tests/unit/agent/linux/test_ip_monitor.py [new file with mode: 0644]

index be69b32c57e5b5ecc1eb0b626aa418da50e52285..a3818a2dcf780b0698a20b1cc7887e1638d4a106 100644 (file)
@@ -31,6 +31,9 @@ kill_radvd: KillFilter, root, /sbin/radvd, -9, -HUP
 ip: IpFilter, ip, root
 ip_exec: IpNetnsExecFilter, ip, root
 
+# For ip monitor
+kill_ip_monitor: KillFilter, root, ip, -9
+
 # ovs_lib (if OVSInterfaceDriver is used)
 ovs-vsctl: CommandFilter, ovs-vsctl, root
 
index 79f80ac298f7b572fcd22bbb372369b12ed9496a..8da4ae2c30f280fc4a9bc0fbe389a068aafe6fa4 100644 (file)
@@ -87,10 +87,10 @@ class AsyncProcess(object):
         return utils.pid_invoked_with_cmdline(
             self.pid, self.cmd_without_namespace)
 
-    def start(self, blocking=False):
+    def start(self, block=False):
         """Launch a process and monitor it asynchronously.
 
-        :param blocking: Block until the process has started.
+        :param block: Block until the process has started.
         :raises eventlet.timeout.Timeout if blocking is True and the process
                 did not start in time.
         """
@@ -100,13 +100,13 @@ class AsyncProcess(object):
             LOG.debug('Launching async process [%s].', self.cmd)
             self._spawn()
 
-        if blocking:
+        if block:
             utils.wait_until_true(self.is_active)
 
-    def stop(self, blocking=False):
+    def stop(self, block=False):
         """Halt the process and watcher threads.
 
-        :param blocking: Block until the process has stopped.
+        :param block: Block until the process has stopped.
         :raises eventlet.timeout.Timeout if blocking is True and the process
                 did not stop in time.
         """
@@ -116,7 +116,7 @@ class AsyncProcess(object):
         else:
             raise AsyncProcessException(_('Process is not running.'))
 
-        if blocking:
+        if block:
             utils.wait_until_true(lambda: not self.is_active())
 
     def _spawn(self):
@@ -216,15 +216,15 @@ class AsyncProcess(object):
     def _read_stderr(self):
         return self._read(self._process.stderr, self._stderr_lines)
 
-    def _iter_queue(self, queue):
+    def _iter_queue(self, queue, block):
         while True:
             try:
-                yield queue.get_nowait()
+                yield queue.get(block=block)
             except eventlet.queue.Empty:
                 break
 
-    def iter_stdout(self):
-        return self._iter_queue(self._stdout_lines)
+    def iter_stdout(self, block=False):
+        return self._iter_queue(self._stdout_lines, block)
 
-    def iter_stderr(self):
-        return self._iter_queue(self._stderr_lines)
+    def iter_stderr(self, block=False):
+        return self._iter_queue(self._stderr_lines, block)
diff --git a/neutron/agent/linux/ip_monitor.py b/neutron/agent/linux/ip_monitor.py
new file mode 100644 (file)
index 0000000..f7485c2
--- /dev/null
@@ -0,0 +1,85 @@
+# Copyright 2015 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from oslo_utils import excutils
+
+from neutron.agent.linux import async_process
+from neutron.i18n import _LE
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class IPMonitorEvent(object):
+    def __init__(self, line, added, interface, cidr):
+        self.line = line
+        self.added = added
+        self.interface = interface
+        self.cidr = cidr
+
+    def __str__(self):
+        return self.line
+
+    @classmethod
+    def from_text(cls, line):
+        route = line.split()
+
+        try:
+            first_word = route[0]
+        except IndexError:
+            with excutils.save_and_reraise_exception():
+                LOG.error(_LE('Unable to parse route "%s"'), line)
+
+        added = (first_word != 'Deleted')
+        if not added:
+            route = route[1:]
+
+        try:
+            interface = route[1]
+            cidr = route[3]
+        except IndexError:
+            with excutils.save_and_reraise_exception():
+                LOG.error(_LE('Unable to parse route "%s"'), line)
+
+        return cls(line, added, interface, cidr)
+
+
+class IPMonitor(async_process.AsyncProcess):
+    """Wrapper over `ip monitor address`.
+
+    To monitor and react indefinitely:
+        m = IPMonitor(namespace='tmp')
+        m.start()
+        for iterable in m:
+            event = IPMonitorEvent.from_text(iterable)
+            print event, event.added, event.interface, event.cidr
+    """
+
+    def __init__(self,
+                 namespace=None,
+                 respawn_interval=None):
+        super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'],
+                                        run_as_root=True,
+                                        respawn_interval=respawn_interval,
+                                        namespace=namespace)
+
+    def __iter__(self):
+        return self.iter_stdout(block=True)
+
+    def start(self):
+        super(IPMonitor, self).start(block=True)
+
+    def stop(self):
+        super(IPMonitor, self).stop(block=True)
index afb9d354a6fc4c62656a5813ad9329d5b8c925a7..8fddb6d4b39b74ee468410dcc12f8b1fd3d05b69 100644 (file)
@@ -50,9 +50,9 @@ class TestAsyncProcess(AsyncProcessTestFramework):
         proc = async_process.AsyncProcess(['tail', '-f',
                                            self.test_file_path])
         self.addCleanup(self._safe_stop, proc)
-        proc.start(blocking=True)
+        proc.start(block=True)
         self._check_stdout(proc)
-        proc.stop(blocking=True)
+        proc.stop(block=True)
 
         # Ensure that the process and greenthreads have stopped
         proc._process.wait()
diff --git a/neutron/tests/functional/agent/linux/test_ip_monitor.py b/neutron/tests/functional/agent/linux/test_ip_monitor.py
new file mode 100644 (file)
index 0000000..dd0fd12
--- /dev/null
@@ -0,0 +1,67 @@
+# Copyright 2015 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.agent.linux import async_process
+from neutron.agent.linux import ip_monitor
+from neutron.tests.functional.agent.linux import test_ip_lib
+
+
+class TestIPMonitor(test_ip_lib.IpLibTestFramework):
+    def setUp(self):
+        super(TestIPMonitor, self).setUp()
+        attr = self.generate_device_details()
+        self.device = self.manage_device(attr)
+        self.monitor = ip_monitor.IPMonitor(attr.namespace)
+        self.addCleanup(self._safe_stop_monitor)
+
+    def _safe_stop_monitor(self):
+        try:
+            self.monitor.stop()
+        except async_process.AsyncProcessException:
+            pass
+
+    def test_ip_monitor_lifecycle(self):
+        self.assertFalse(self.monitor.is_active())
+        self.monitor.start()
+        self.assertTrue(self.monitor.is_active())
+        self.monitor.stop()
+        self.assertFalse(self.monitor.is_active())
+
+    def test_ip_monitor_events(self):
+        self.monitor.start()
+
+        cidr = '169.254.128.1/24'
+        self.device.addr.add(4, cidr, '169.254.128.255')
+        self._assert_event(expected_name=self.device.name,
+                           expected_cidr=cidr,
+                           expected_added=True,
+                           event=ip_monitor.IPMonitorEvent.from_text(
+                               next(self.monitor.iter_stdout(block=True))))
+
+        self.device.addr.delete(4, cidr)
+        self._assert_event(expected_name=self.device.name,
+                           expected_cidr=cidr,
+                           expected_added=False,
+                           event=ip_monitor.IPMonitorEvent.from_text(
+                               next(self.monitor.iter_stdout(block=True))))
+
+    def _assert_event(self,
+                      expected_name,
+                      expected_cidr,
+                      expected_added,
+                      event):
+        self.assertEqual(expected_name, event.interface)
+        self.assertEqual(expected_added, event.added)
+        self.assertEqual(expected_cidr, event.cidr)
index a9e8671816eda69070527779f80cb647eb42f619..5508457218eda66c0ae23b4aa6c09fa156fd6c93 100644 (file)
@@ -24,7 +24,7 @@ class TestPIDHelpers(test_async_process.AsyncProcessTestFramework):
     def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self):
         cmd = ['tail', '-f', self.test_file_path]
         proc = async_process.AsyncProcess(cmd)
-        proc.start(blocking=True)
+        proc.start(block=True)
         self.addCleanup(proc.stop)
 
         pid = proc.pid
index b35af6e372af2ef56f87a7a8f8135f3750ab3f1e..6856d86468246d6dc643355f00a0036291332bea 100644 (file)
@@ -128,13 +128,14 @@ class TestAsyncProcess(base.BaseTestCase):
         mock_start.assert_called_once_with()
 
     def test__iter_queue_returns_empty_list_for_empty_queue(self):
-        result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
+        result = list(self.proc._iter_queue(eventlet.queue.LightQueue(),
+                                            False))
         self.assertEqual(result, [])
 
     def test__iter_queue_returns_queued_data(self):
         queue = eventlet.queue.LightQueue()
         queue.put('foo')
-        result = list(self.proc._iter_queue(queue))
+        result = list(self.proc._iter_queue(queue, False))
         self.assertEqual(result, ['foo'])
 
     def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
@@ -146,7 +147,7 @@ class TestAsyncProcess(base.BaseTestCase):
 
         self.assertEqual(value, expected_value)
         queue = getattr(self.proc, '_%s_lines' % output_type, None)
-        mock_iter_queue.assert_called_with(queue)
+        mock_iter_queue.assert_called_with(queue, False)
 
     def test_iter_stdout(self):
         self._test_iter_output_calls_iter_queue_on_output_queue('stdout')
diff --git a/neutron/tests/unit/agent/linux/test_ip_monitor.py b/neutron/tests/unit/agent/linux/test_ip_monitor.py
new file mode 100644 (file)
index 0000000..ea71237
--- /dev/null
@@ -0,0 +1,36 @@
+# Copyright 2015 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.agent.linux import ip_monitor
+from neutron.tests import base
+
+
+class TestIPMonitorEvent(base.BaseTestCase):
+    def test_from_text_parses_added_line(self):
+        event = ip_monitor.IPMonitorEvent.from_text(
+            '3: wlp3s0    inet 192.168.3.59/24 brd 192.168.3.255 '
+            'scope global dynamic wlp3s0\       valid_lft 300sec '
+            'preferred_lft 300sec')
+        self.assertEqual('wlp3s0', event.interface)
+        self.assertTrue(event.added)
+        self.assertEqual('192.168.3.59/24', event.cidr)
+
+    def test_from_text_parses_deleted_line(self):
+        event = ip_monitor.IPMonitorEvent.from_text(
+            'Deleted 1: lo    inet 127.0.0.2/8 scope host secondary lo\''
+            '       valid_lft forever preferred_lft forever')
+        self.assertEqual('lo', event.interface)
+        self.assertFalse(event.added)
+        self.assertEqual('127.0.0.2/8', event.cidr)