]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Implements ProcessMonitor to watch over external processes
authorMiguel Angel Ajo <mangelajo@redhat.com>
Mon, 18 Aug 2014 10:59:32 +0000 (12:59 +0200)
committerMiguel Angel Ajo <mangelajo@redhat.com>
Tue, 2 Sep 2014 04:20:08 +0000 (06:20 +0200)
This class takes care of all the spawned external processes,
taking the administrator configured action in the case of any
of the external processes die unexpectedly.

Implements: blueprint agent-child-processes-status

Change-Id: I6bc7a415dde5723dac07589859796c2ffeef5b54

neutron/agent/linux/external_process.py
neutron/tests/functional/agent/linux/simple_daemon.py [new file with mode: 0644]
neutron/tests/functional/agent/linux/test_process_monitor.py [new file with mode: 0644]
neutron/tests/unit/agent/linux/test_process_monitor.py [new file with mode: 0644]
neutron/tests/unit/test_dhcp_agent.py
neutron/tests/unit/test_l3_agent.py
neutron/tests/unit/test_linux_external_process.py

index 672d703551b3bd8d093cae4992fd35d83c2919ee..b399af4369c23aa9fb6f0ab85774184f8081a87b 100644 (file)
 #    under the License.
 #
 # @author: Mark McClain, DreamHost
+import collections
 
+import eventlet
 from oslo.config import cfg
 
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
+from neutron.openstack.common.gettextutils import _LE
+from neutron.openstack.common import lockutils
 from neutron.openstack.common import log as logging
 
 LOG = logging.getLogger(__name__)
 
+
 OPTS = [
     cfg.StrOpt('external_pids',
                default='$state_path/external/pids',
                help=_('Location to store child pid files')),
+    cfg.BoolOpt('check_child_processes', default=False,
+                help=_("Periodically check child processes")),
+    cfg.StrOpt('check_child_processes_action', default='respawn',
+               choices=['respawn', 'exit'],
+               help=_('Action to be executed when a child process dies')),
+    cfg.IntOpt('check_child_processes_interval', default=60,
+               help=_('Interval between checks of child process liveness '
+                      '(seconds)')),
 ]
 
+
 cfg.CONF.register_opts(OPTS)
 
 
@@ -37,23 +51,33 @@ class ProcessManager(object):
     Note: The manager expects uuid to be in cmdline.
     """
     def __init__(self, conf, uuid, root_helper='sudo',
-                 namespace=None, service=None, pids_path=None):
+                 namespace=None, service=None, pids_path=None,
+                 default_cmd_callback=None,
+                 cmd_addl_env=None):
+
         self.conf = conf
         self.uuid = uuid
         self.root_helper = root_helper
         self.namespace = namespace
+        self.default_cmd_callback = default_cmd_callback
+        self.cmd_addl_env = cmd_addl_env
+        self.pids_path = pids_path or self.conf.external_pids
+
         if service:
             self.service_pid_fname = 'pid.' + service
+            self.service = service
         else:
             self.service_pid_fname = 'pid'
-        self.pids_path = pids_path or self.conf.external_pids
+            self.service = 'default-service'
 
-    def enable(self, cmd_callback, reload_cfg=False):
+    def enable(self, cmd_callback=None, reload_cfg=False):
         if not self.active:
+            if not cmd_callback:
+                cmd_callback = self.default_cmd_callback
             cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
 
             ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
-            ip_wrapper.netns.execute(cmd)
+            ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env)
         elif reload_cfg:
             self.reload_cfg()
 
@@ -105,3 +129,130 @@ class ProcessManager(object):
                 return self.uuid in f.readline()
         except IOError:
             return False
+
+
+ServiceId = collections.namedtuple('ServiceId', ['uuid', 'service'])
+
+
+class ProcessMonitor(object):
+
+    def __init__(self, config, root_helper, resource_type, exit_handler):
+        """Handle multiple process managers and watch over all of them.
+
+        :param config: oslo config object with the agent configuration.
+        :type config: oslo.config.ConfigOpts
+        :param root_helper: root helper to be used with new ProcessManagers
+        :type root_helper: str
+        :param resource_type: can be dhcp, router, load_balancer, etc.
+        :type resource_type: str
+        :param exit_handler: function to execute when agent exit has to
+                             be executed, it should take care of actual
+                             exit
+        :type exit_hanlder: function
+        """
+        self._config = config
+        self._root_helper = root_helper
+        self._resource_type = resource_type
+        self._exit_handler = exit_handler
+
+        self._process_managers = {}
+
+        if self._config.check_child_processes:
+            self._spawn_checking_thread()
+
+    def enable(self, uuid, cmd_callback, namespace=None, service=None,
+               reload_cfg=False, cmd_addl_env=None):
+        """Creates a process and ensures that it is monitored.
+
+        It will create a new ProcessManager and tie it to the uuid/service.
+        """
+        process_manager = ProcessManager(conf=self._config,
+                                         uuid=uuid,
+                                         root_helper=self._root_helper,
+                                         namespace=namespace,
+                                         service=service,
+                                         default_cmd_callback=cmd_callback,
+                                         cmd_addl_env=cmd_addl_env)
+
+        process_manager.enable(reload_cfg=reload_cfg)
+        service_id = ServiceId(uuid, service)
+        self._process_managers[service_id] = process_manager
+
+    def disable(self, uuid, namespace=None, service=None):
+        """Disables the process and stops monitoring it."""
+        service_id = ServiceId(uuid, service)
+        process_manager = self._process_managers.pop(service_id, None)
+
+        # we could be trying to disable a process_manager which was
+        # started on a separate run of this agent, or during netns-cleanup
+        # therefore we won't know about such uuid and we need to
+        # build the process_manager to kill it
+        if not process_manager:
+            process_manager = ProcessManager(conf=self._config,
+                                             uuid=uuid,
+                                             root_helper=self._root_helper,
+                                             namespace=namespace,
+                                             service=service)
+
+        process_manager.disable()
+
+    def disable_all(self):
+        for service_id in self._process_managers.keys():
+            self.disable(uuid=service_id.uuid, service=service_id.service)
+
+    def get_process_manager(self, uuid, service=None):
+        """Returns a process manager for manipulation"""
+        service_id = ServiceId(uuid, service)
+        return self._process_managers.get(service_id)
+
+    def _get_process_manager_attribute(self, attribute, uuid, service=None):
+        process_manager = self.get_process_manager(uuid, service)
+        if process_manager:
+            return getattr(process_manager, attribute)
+        else:
+            return False
+
+    def is_active(self, uuid, service=None):
+        return self._get_process_manager_attribute('active', uuid, service)
+
+    def get_pid(self, uuid, service=None):
+        return self._get_process_manager_attribute('pid', uuid, service)
+
+    def _spawn_checking_thread(self):
+        eventlet.spawn(self._periodic_checking_thread)
+
+    @lockutils.synchronized("_check_child_processes")
+    def _check_child_processes(self):
+        for service_id in self._process_managers:
+            pm = self._process_managers.get(service_id)
+
+            if pm and not pm.active:
+                LOG.error(_LE("%(service)s for %(resource_type)s "
+                              "with uuid %(uuid)s not found. "
+                              "The process should not have died"),
+                          {'service': pm.service,
+                           'resource_type': self._resource_type,
+                           'uuid': service_id.uuid})
+                self._execute_action(service_id)
+            eventlet.sleep(0)
+
+    def _periodic_checking_thread(self):
+        while True:
+            eventlet.sleep(self._config.check_child_processes_interval)
+            eventlet.spawn(self._check_child_processes)
+
+    def _execute_action(self, service_id):
+        action_function = getattr(
+            self, "_%s_action" % self._config.check_child_processes_action)
+        action_function(service_id)
+
+    def _respawn_action(self, service_id):
+        LOG.error(_LE("respawning %(service)s for uuid %(uuid)s"),
+                  {'service': service_id.service,
+                   'uuid': service_id.uuid})
+        self._process_managers[service_id].enable()
+
+    def _exit_action(self, service_id):
+        LOG.error(_LE("Exiting agent as programmed in check_child_processes_"
+                      "actions"))
+        self._exit_handler(service_id.uuid, service_id.service)
diff --git a/neutron/tests/functional/agent/linux/simple_daemon.py b/neutron/tests/functional/agent/linux/simple_daemon.py
new file mode 100644 (file)
index 0000000..a627d48
--- /dev/null
@@ -0,0 +1,55 @@
+# Copyright 2014 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+from oslo.config import cfg
+
+from neutron.agent.linux import daemon
+
+
+def main():
+
+    class SimpleDaemon(daemon.Daemon):
+        """The purpose of this daemon is to serve as an example, and also as
+        a dummy daemon, which can be invoked by functional testing, it
+        does nothing but setting the pid file, and staying detached in the
+        background.
+        """
+
+        def run(self):
+            while True:
+                eventlet.sleep(10)
+
+    eventlet.monkey_patch()
+    opts = [
+        cfg.StrOpt('uuid',
+                   help=_('uuid provided from the command line '
+                          'so external_process can track us via /proc/'
+                          'cmdline interface.'),
+                   required=True),
+        cfg.StrOpt('pid_file',
+                   help=_('Location of pid file of this process.'),
+                   required=True)
+    ]
+
+    cfg.CONF.register_cli_opts(opts)
+    # Don't get the default configuration file
+    cfg.CONF(project='neutron', default_config_files=[])
+    simple_daemon = SimpleDaemon(cfg.CONF.pid_file,
+                                 uuid=cfg.CONF.uuid)
+    simple_daemon.start()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/neutron/tests/functional/agent/linux/test_process_monitor.py b/neutron/tests/functional/agent/linux/test_process_monitor.py
new file mode 100644 (file)
index 0000000..8327a38
--- /dev/null
@@ -0,0 +1,107 @@
+# Copyright 2014 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+from oslo.config import cfg
+from six import moves
+
+from neutron.agent.linux import external_process
+from neutron.tests.functional.agent.linux import simple_daemon
+from neutron.tests.functional import base
+
+
+UUID_FORMAT = "test-uuid-%d"
+
+
+class BaseTestProcessMonitor(base.BaseSudoTestCase):
+
+    def setUp(self):
+        super(BaseTestProcessMonitor, self).setUp()
+        self._exit_handler_called = False
+        cfg.CONF.set_override('check_child_processes', True)
+        cfg.CONF.set_override('check_child_processes_interval', 1)
+        self._child_processes = []
+        self._ext_processes = None
+        self.addCleanup(self.cleanup_spawned_children)
+
+    def create_child_processes_manager(self, action):
+        cfg.CONF.set_override('check_child_processes_action', action)
+        self._ext_processes = external_process.ProcessMonitor(
+            config=cfg.CONF,
+            root_helper=None,
+            resource_type='test',
+            exit_handler=self._exit_handler)
+
+    def _exit_handler(self, uuid, service):
+        self._exit_handler_called = True
+        self._exit_handler_params = (uuid, service)
+
+    def _make_cmdline_callback(self, uuid):
+        def _cmdline_callback(pidfile):
+            cmdline = ["python", simple_daemon.__file__,
+                       "--uuid=%s" % uuid,
+                       "--pid_file=%s" % pidfile]
+            return cmdline
+        return _cmdline_callback
+
+    def _spawn_n_children(self, n, service=None):
+        self._child_processes = []
+        for child_number in moves.xrange(n):
+            uuid = self._child_uuid(child_number)
+            _callback = self._make_cmdline_callback(uuid)
+            self._ext_processes.enable(uuid=uuid,
+                                       cmd_callback=_callback,
+                                       service=service)
+
+            pm = self._ext_processes.get_process_manager(uuid, service)
+            self._child_processes.append(pm)
+
+    @staticmethod
+    def _child_uuid(child_number):
+        return UUID_FORMAT % child_number
+
+    def _kill_last_child(self):
+        self._child_processes[-1].disable()
+
+    def spawn_child_processes_and_kill_last(self, service=None, number=2):
+        self._spawn_n_children(number, service)
+        self._kill_last_child()
+        self.assertFalse(self._child_processes[-1].active)
+
+    def wait_for_all_childs_respawned(self):
+        def all_childs_active():
+            return all(pm.active for pm in self._child_processes)
+
+        self._wait_for_condition(all_childs_active)
+
+    def _wait_for_condition(self, exit_condition, extra_time=5):
+        # we need to allow extra_time for the check process to happen
+        # and properly execute action over the gone processes under
+        # high load conditions
+        max_wait_time = cfg.CONF.check_child_processes_interval + extra_time
+        with self.assert_max_execution_time(max_wait_time):
+            while not exit_condition():
+                eventlet.sleep(0.01)
+
+    def cleanup_spawned_children(self):
+        if self._ext_processes:
+            self._ext_processes.disable_all()
+
+
+class TestProcessMonitor(BaseTestProcessMonitor):
+
+    def test_respawn_handler(self):
+        self.create_child_processes_manager('respawn')
+        self.spawn_child_processes_and_kill_last()
+        self.wait_for_all_childs_respawned()
diff --git a/neutron/tests/unit/agent/linux/test_process_monitor.py b/neutron/tests/unit/agent/linux/test_process_monitor.py
new file mode 100644 (file)
index 0000000..1f9e0b3
--- /dev/null
@@ -0,0 +1,99 @@
+# Copyright 2014 Red Hat Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+import mock
+
+from neutron.agent.linux import external_process
+from neutron.tests import base
+
+TEST_UUID = 'test-uuid'
+TEST_SERVICE1 = 'testsvc'
+TEST_PID = 1234
+
+
+class BaseTestProcessMonitor(base.BaseTestCase):
+
+    def setUp(self):
+        super(BaseTestProcessMonitor, self).setUp()
+        self.pm_patch = mock.patch("neutron.agent.linux.external_process."
+                                   "ProcessManager", side_effect=mock.Mock)
+        self.pmanager = self.pm_patch.start()
+
+        self.log_patch = mock.patch("neutron.agent.linux.external_process."
+                                    "LOG.error")
+        self.error_log = self.log_patch.start()
+
+        self.spawn_patch = mock.patch("eventlet.spawn")
+        self.eventlent_spawn = self.spawn_patch.start()
+
+        # create a default process monitor
+        self.create_child_process_monitor('respawn')
+
+    def create_child_process_monitor(self, action):
+        self.exit_handler = mock.Mock()
+        conf = mock.Mock()
+        conf.check_child_processes_action = action
+        conf.check_child_processes = True
+        self.pmonitor = external_process.ProcessMonitor(
+            config=conf,
+            root_helper=None,
+            resource_type='test',
+            exit_handler=self.exit_handler)
+
+    def get_monitored_process_manager(self, uuid, service=None):
+        self.pmonitor.enable(uuid=uuid, service=service, cmd_callback=None)
+        return self.pmonitor.get_process_manager(uuid, service)
+
+
+class TestProcessMonitor(BaseTestProcessMonitor):
+
+    def test_error_logged(self):
+        pm = self.get_monitored_process_manager(TEST_UUID)
+        pm.active = False
+        self.pmonitor._check_child_processes()
+        self.assertTrue(self.error_log.called)
+
+    def test_exit_handler(self):
+        self.create_child_process_monitor('exit')
+        pm = self.get_monitored_process_manager(TEST_UUID)
+        pm.active = False
+        self.pmonitor._check_child_processes()
+        self.exit_handler.assert_called_once_with(TEST_UUID, None)
+
+    def test_different_service_types(self):
+        pm_none = self.get_monitored_process_manager(TEST_UUID)
+        pm_svc1 = self.get_monitored_process_manager(TEST_UUID, TEST_SERVICE1)
+        self.assertNotEqual(pm_none, pm_svc1)
+
+    def test_active_method(self, service=None):
+        pm = self.get_monitored_process_manager(TEST_UUID, service)
+        pm.active = False
+        self.assertFalse(self.pmonitor.is_active(TEST_UUID, service))
+        pm.active = True
+        self.assertTrue(self.pmonitor.is_active(TEST_UUID, service))
+
+    def test_active_method_with_service(self):
+        self.test_active_method(TEST_SERVICE1)
+
+    def test_pid_method(self, service=None):
+        pm = self.get_monitored_process_manager(TEST_UUID, service)
+        pm.pid = TEST_PID
+        self.assertEqual(TEST_PID, self.pmonitor.get_pid(TEST_UUID, service))
+
+    def test_pid_method_with_service(self):
+        self.test_pid_method(TEST_PID)
+
+    def test_pid_method_unknown_uuid(self):
+        self.assertFalse(self.pmonitor.get_pid('bad-uuid'))
index 3740199ae4b7cacaf12a54960e811a786d263dc3..b87abefa749b5f6bbfaa561106be1fe153346714 100644 (file)
@@ -665,7 +665,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
                         mock.ANY,
                         '--debug',
                         ('--log-file=neutron-ns-metadata-proxy-%s.log' %
-                         fake_meta_network.id)])
+                         fake_meta_network.id)], addl_env=None)
                 ])
         finally:
             self.external_process_p.start()
index 5e80b66e0a3f5744c9c2d40e38aca41b8775a4fb..3cd1a6f4d84eeca8b7a82edb17fb9aefa7e04eea 100644 (file)
@@ -2179,7 +2179,7 @@ class TestL3AgentEventHandler(base.BaseTestCase):
                         '--debug',
                         '--log-file=neutron-ns-metadata-proxy-%s.log' %
                         router_id
-                    ])
+                    ], addl_env=None)
                 ])
         finally:
             self.external_process_p.start()
index da024042a7e06ece4059de0fb9c5f67226427f8f..d915a34e719fb2ae411efc4a832136ed72ec4d11 100644 (file)
@@ -61,8 +61,8 @@ class TestProcessManager(base.BaseTestCase):
                     name.assert_called_once_with(ensure_pids_dir=True)
                     ip_lib.assert_has_calls([
                         mock.call.IPWrapper('sudo', 'ns'),
-                        mock.call.IPWrapper().netns.execute(['the', 'cmd'])]
-                    )
+                        mock.call.IPWrapper().netns.execute(['the', 'cmd'],
+                                                            addl_env=None)])
 
     def test_enable_with_namespace_process_active(self):
         callback = mock.Mock()