From: Miguel Angel Ajo Date: Mon, 18 Aug 2014 10:59:32 +0000 (+0200) Subject: Implements ProcessMonitor to watch over external processes X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=7757e8e9dbece8c142a0d1302798d9caf31fdbd3;p=openstack-build%2Fneutron-build.git Implements ProcessMonitor to watch over external processes This class takes care of all the spawned external processes, taking the administrator configured action in the case of any of the external processes die unexpectedly. Implements: blueprint agent-child-processes-status Change-Id: I6bc7a415dde5723dac07589859796c2ffeef5b54 --- diff --git a/neutron/agent/linux/external_process.py b/neutron/agent/linux/external_process.py index 672d70355..b399af436 100644 --- a/neutron/agent/linux/external_process.py +++ b/neutron/agent/linux/external_process.py @@ -13,21 +13,35 @@ # under the License. # # @author: Mark McClain, DreamHost +import collections +import eventlet from oslo.config import cfg from neutron.agent.linux import ip_lib from neutron.agent.linux import utils +from neutron.openstack.common.gettextutils import _LE +from neutron.openstack.common import lockutils from neutron.openstack.common import log as logging LOG = logging.getLogger(__name__) + OPTS = [ cfg.StrOpt('external_pids', default='$state_path/external/pids', help=_('Location to store child pid files')), + cfg.BoolOpt('check_child_processes', default=False, + help=_("Periodically check child processes")), + cfg.StrOpt('check_child_processes_action', default='respawn', + choices=['respawn', 'exit'], + help=_('Action to be executed when a child process dies')), + cfg.IntOpt('check_child_processes_interval', default=60, + help=_('Interval between checks of child process liveness ' + '(seconds)')), ] + cfg.CONF.register_opts(OPTS) @@ -37,23 +51,33 @@ class ProcessManager(object): Note: The manager expects uuid to be in cmdline. """ def __init__(self, conf, uuid, root_helper='sudo', - namespace=None, service=None, pids_path=None): + namespace=None, service=None, pids_path=None, + default_cmd_callback=None, + cmd_addl_env=None): + self.conf = conf self.uuid = uuid self.root_helper = root_helper self.namespace = namespace + self.default_cmd_callback = default_cmd_callback + self.cmd_addl_env = cmd_addl_env + self.pids_path = pids_path or self.conf.external_pids + if service: self.service_pid_fname = 'pid.' + service + self.service = service else: self.service_pid_fname = 'pid' - self.pids_path = pids_path or self.conf.external_pids + self.service = 'default-service' - def enable(self, cmd_callback, reload_cfg=False): + def enable(self, cmd_callback=None, reload_cfg=False): if not self.active: + if not cmd_callback: + cmd_callback = self.default_cmd_callback cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True)) ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace) - ip_wrapper.netns.execute(cmd) + ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env) elif reload_cfg: self.reload_cfg() @@ -105,3 +129,130 @@ class ProcessManager(object): return self.uuid in f.readline() except IOError: return False + + +ServiceId = collections.namedtuple('ServiceId', ['uuid', 'service']) + + +class ProcessMonitor(object): + + def __init__(self, config, root_helper, resource_type, exit_handler): + """Handle multiple process managers and watch over all of them. + + :param config: oslo config object with the agent configuration. + :type config: oslo.config.ConfigOpts + :param root_helper: root helper to be used with new ProcessManagers + :type root_helper: str + :param resource_type: can be dhcp, router, load_balancer, etc. + :type resource_type: str + :param exit_handler: function to execute when agent exit has to + be executed, it should take care of actual + exit + :type exit_hanlder: function + """ + self._config = config + self._root_helper = root_helper + self._resource_type = resource_type + self._exit_handler = exit_handler + + self._process_managers = {} + + if self._config.check_child_processes: + self._spawn_checking_thread() + + def enable(self, uuid, cmd_callback, namespace=None, service=None, + reload_cfg=False, cmd_addl_env=None): + """Creates a process and ensures that it is monitored. + + It will create a new ProcessManager and tie it to the uuid/service. + """ + process_manager = ProcessManager(conf=self._config, + uuid=uuid, + root_helper=self._root_helper, + namespace=namespace, + service=service, + default_cmd_callback=cmd_callback, + cmd_addl_env=cmd_addl_env) + + process_manager.enable(reload_cfg=reload_cfg) + service_id = ServiceId(uuid, service) + self._process_managers[service_id] = process_manager + + def disable(self, uuid, namespace=None, service=None): + """Disables the process and stops monitoring it.""" + service_id = ServiceId(uuid, service) + process_manager = self._process_managers.pop(service_id, None) + + # we could be trying to disable a process_manager which was + # started on a separate run of this agent, or during netns-cleanup + # therefore we won't know about such uuid and we need to + # build the process_manager to kill it + if not process_manager: + process_manager = ProcessManager(conf=self._config, + uuid=uuid, + root_helper=self._root_helper, + namespace=namespace, + service=service) + + process_manager.disable() + + def disable_all(self): + for service_id in self._process_managers.keys(): + self.disable(uuid=service_id.uuid, service=service_id.service) + + def get_process_manager(self, uuid, service=None): + """Returns a process manager for manipulation""" + service_id = ServiceId(uuid, service) + return self._process_managers.get(service_id) + + def _get_process_manager_attribute(self, attribute, uuid, service=None): + process_manager = self.get_process_manager(uuid, service) + if process_manager: + return getattr(process_manager, attribute) + else: + return False + + def is_active(self, uuid, service=None): + return self._get_process_manager_attribute('active', uuid, service) + + def get_pid(self, uuid, service=None): + return self._get_process_manager_attribute('pid', uuid, service) + + def _spawn_checking_thread(self): + eventlet.spawn(self._periodic_checking_thread) + + @lockutils.synchronized("_check_child_processes") + def _check_child_processes(self): + for service_id in self._process_managers: + pm = self._process_managers.get(service_id) + + if pm and not pm.active: + LOG.error(_LE("%(service)s for %(resource_type)s " + "with uuid %(uuid)s not found. " + "The process should not have died"), + {'service': pm.service, + 'resource_type': self._resource_type, + 'uuid': service_id.uuid}) + self._execute_action(service_id) + eventlet.sleep(0) + + def _periodic_checking_thread(self): + while True: + eventlet.sleep(self._config.check_child_processes_interval) + eventlet.spawn(self._check_child_processes) + + def _execute_action(self, service_id): + action_function = getattr( + self, "_%s_action" % self._config.check_child_processes_action) + action_function(service_id) + + def _respawn_action(self, service_id): + LOG.error(_LE("respawning %(service)s for uuid %(uuid)s"), + {'service': service_id.service, + 'uuid': service_id.uuid}) + self._process_managers[service_id].enable() + + def _exit_action(self, service_id): + LOG.error(_LE("Exiting agent as programmed in check_child_processes_" + "actions")) + self._exit_handler(service_id.uuid, service_id.service) diff --git a/neutron/tests/functional/agent/linux/simple_daemon.py b/neutron/tests/functional/agent/linux/simple_daemon.py new file mode 100644 index 000000000..a627d4857 --- /dev/null +++ b/neutron/tests/functional/agent/linux/simple_daemon.py @@ -0,0 +1,55 @@ +# Copyright 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +from oslo.config import cfg + +from neutron.agent.linux import daemon + + +def main(): + + class SimpleDaemon(daemon.Daemon): + """The purpose of this daemon is to serve as an example, and also as + a dummy daemon, which can be invoked by functional testing, it + does nothing but setting the pid file, and staying detached in the + background. + """ + + def run(self): + while True: + eventlet.sleep(10) + + eventlet.monkey_patch() + opts = [ + cfg.StrOpt('uuid', + help=_('uuid provided from the command line ' + 'so external_process can track us via /proc/' + 'cmdline interface.'), + required=True), + cfg.StrOpt('pid_file', + help=_('Location of pid file of this process.'), + required=True) + ] + + cfg.CONF.register_cli_opts(opts) + # Don't get the default configuration file + cfg.CONF(project='neutron', default_config_files=[]) + simple_daemon = SimpleDaemon(cfg.CONF.pid_file, + uuid=cfg.CONF.uuid) + simple_daemon.start() + + +if __name__ == "__main__": + main() diff --git a/neutron/tests/functional/agent/linux/test_process_monitor.py b/neutron/tests/functional/agent/linux/test_process_monitor.py new file mode 100644 index 000000000..8327a38b1 --- /dev/null +++ b/neutron/tests/functional/agent/linux/test_process_monitor.py @@ -0,0 +1,107 @@ +# Copyright 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +from oslo.config import cfg +from six import moves + +from neutron.agent.linux import external_process +from neutron.tests.functional.agent.linux import simple_daemon +from neutron.tests.functional import base + + +UUID_FORMAT = "test-uuid-%d" + + +class BaseTestProcessMonitor(base.BaseSudoTestCase): + + def setUp(self): + super(BaseTestProcessMonitor, self).setUp() + self._exit_handler_called = False + cfg.CONF.set_override('check_child_processes', True) + cfg.CONF.set_override('check_child_processes_interval', 1) + self._child_processes = [] + self._ext_processes = None + self.addCleanup(self.cleanup_spawned_children) + + def create_child_processes_manager(self, action): + cfg.CONF.set_override('check_child_processes_action', action) + self._ext_processes = external_process.ProcessMonitor( + config=cfg.CONF, + root_helper=None, + resource_type='test', + exit_handler=self._exit_handler) + + def _exit_handler(self, uuid, service): + self._exit_handler_called = True + self._exit_handler_params = (uuid, service) + + def _make_cmdline_callback(self, uuid): + def _cmdline_callback(pidfile): + cmdline = ["python", simple_daemon.__file__, + "--uuid=%s" % uuid, + "--pid_file=%s" % pidfile] + return cmdline + return _cmdline_callback + + def _spawn_n_children(self, n, service=None): + self._child_processes = [] + for child_number in moves.xrange(n): + uuid = self._child_uuid(child_number) + _callback = self._make_cmdline_callback(uuid) + self._ext_processes.enable(uuid=uuid, + cmd_callback=_callback, + service=service) + + pm = self._ext_processes.get_process_manager(uuid, service) + self._child_processes.append(pm) + + @staticmethod + def _child_uuid(child_number): + return UUID_FORMAT % child_number + + def _kill_last_child(self): + self._child_processes[-1].disable() + + def spawn_child_processes_and_kill_last(self, service=None, number=2): + self._spawn_n_children(number, service) + self._kill_last_child() + self.assertFalse(self._child_processes[-1].active) + + def wait_for_all_childs_respawned(self): + def all_childs_active(): + return all(pm.active for pm in self._child_processes) + + self._wait_for_condition(all_childs_active) + + def _wait_for_condition(self, exit_condition, extra_time=5): + # we need to allow extra_time for the check process to happen + # and properly execute action over the gone processes under + # high load conditions + max_wait_time = cfg.CONF.check_child_processes_interval + extra_time + with self.assert_max_execution_time(max_wait_time): + while not exit_condition(): + eventlet.sleep(0.01) + + def cleanup_spawned_children(self): + if self._ext_processes: + self._ext_processes.disable_all() + + +class TestProcessMonitor(BaseTestProcessMonitor): + + def test_respawn_handler(self): + self.create_child_processes_manager('respawn') + self.spawn_child_processes_and_kill_last() + self.wait_for_all_childs_respawned() diff --git a/neutron/tests/unit/agent/linux/test_process_monitor.py b/neutron/tests/unit/agent/linux/test_process_monitor.py new file mode 100644 index 000000000..1f9e0b36b --- /dev/null +++ b/neutron/tests/unit/agent/linux/test_process_monitor.py @@ -0,0 +1,99 @@ +# Copyright 2014 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import mock + +from neutron.agent.linux import external_process +from neutron.tests import base + +TEST_UUID = 'test-uuid' +TEST_SERVICE1 = 'testsvc' +TEST_PID = 1234 + + +class BaseTestProcessMonitor(base.BaseTestCase): + + def setUp(self): + super(BaseTestProcessMonitor, self).setUp() + self.pm_patch = mock.patch("neutron.agent.linux.external_process." + "ProcessManager", side_effect=mock.Mock) + self.pmanager = self.pm_patch.start() + + self.log_patch = mock.patch("neutron.agent.linux.external_process." + "LOG.error") + self.error_log = self.log_patch.start() + + self.spawn_patch = mock.patch("eventlet.spawn") + self.eventlent_spawn = self.spawn_patch.start() + + # create a default process monitor + self.create_child_process_monitor('respawn') + + def create_child_process_monitor(self, action): + self.exit_handler = mock.Mock() + conf = mock.Mock() + conf.check_child_processes_action = action + conf.check_child_processes = True + self.pmonitor = external_process.ProcessMonitor( + config=conf, + root_helper=None, + resource_type='test', + exit_handler=self.exit_handler) + + def get_monitored_process_manager(self, uuid, service=None): + self.pmonitor.enable(uuid=uuid, service=service, cmd_callback=None) + return self.pmonitor.get_process_manager(uuid, service) + + +class TestProcessMonitor(BaseTestProcessMonitor): + + def test_error_logged(self): + pm = self.get_monitored_process_manager(TEST_UUID) + pm.active = False + self.pmonitor._check_child_processes() + self.assertTrue(self.error_log.called) + + def test_exit_handler(self): + self.create_child_process_monitor('exit') + pm = self.get_monitored_process_manager(TEST_UUID) + pm.active = False + self.pmonitor._check_child_processes() + self.exit_handler.assert_called_once_with(TEST_UUID, None) + + def test_different_service_types(self): + pm_none = self.get_monitored_process_manager(TEST_UUID) + pm_svc1 = self.get_monitored_process_manager(TEST_UUID, TEST_SERVICE1) + self.assertNotEqual(pm_none, pm_svc1) + + def test_active_method(self, service=None): + pm = self.get_monitored_process_manager(TEST_UUID, service) + pm.active = False + self.assertFalse(self.pmonitor.is_active(TEST_UUID, service)) + pm.active = True + self.assertTrue(self.pmonitor.is_active(TEST_UUID, service)) + + def test_active_method_with_service(self): + self.test_active_method(TEST_SERVICE1) + + def test_pid_method(self, service=None): + pm = self.get_monitored_process_manager(TEST_UUID, service) + pm.pid = TEST_PID + self.assertEqual(TEST_PID, self.pmonitor.get_pid(TEST_UUID, service)) + + def test_pid_method_with_service(self): + self.test_pid_method(TEST_PID) + + def test_pid_method_unknown_uuid(self): + self.assertFalse(self.pmonitor.get_pid('bad-uuid')) diff --git a/neutron/tests/unit/test_dhcp_agent.py b/neutron/tests/unit/test_dhcp_agent.py index 3740199ae..b87abefa7 100644 --- a/neutron/tests/unit/test_dhcp_agent.py +++ b/neutron/tests/unit/test_dhcp_agent.py @@ -665,7 +665,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase): mock.ANY, '--debug', ('--log-file=neutron-ns-metadata-proxy-%s.log' % - fake_meta_network.id)]) + fake_meta_network.id)], addl_env=None) ]) finally: self.external_process_p.start() diff --git a/neutron/tests/unit/test_l3_agent.py b/neutron/tests/unit/test_l3_agent.py index 5e80b66e0..3cd1a6f4d 100644 --- a/neutron/tests/unit/test_l3_agent.py +++ b/neutron/tests/unit/test_l3_agent.py @@ -2179,7 +2179,7 @@ class TestL3AgentEventHandler(base.BaseTestCase): '--debug', '--log-file=neutron-ns-metadata-proxy-%s.log' % router_id - ]) + ], addl_env=None) ]) finally: self.external_process_p.start() diff --git a/neutron/tests/unit/test_linux_external_process.py b/neutron/tests/unit/test_linux_external_process.py index da024042a..d915a34e7 100644 --- a/neutron/tests/unit/test_linux_external_process.py +++ b/neutron/tests/unit/test_linux_external_process.py @@ -61,8 +61,8 @@ class TestProcessManager(base.BaseTestCase): name.assert_called_once_with(ensure_pids_dir=True) ip_lib.assert_has_calls([ mock.call.IPWrapper('sudo', 'ns'), - mock.call.IPWrapper().netns.execute(['the', 'cmd'])] - ) + mock.call.IPWrapper().netns.execute(['the', 'cmd'], + addl_env=None)]) def test_enable_with_namespace_process_active(self): callback = mock.Mock()