# under the License.
#
# @author: Mark McClain, DreamHost
+import collections
+import eventlet
from oslo.config import cfg
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
+from neutron.openstack.common.gettextutils import _LE
+from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
+
OPTS = [
cfg.StrOpt('external_pids',
default='$state_path/external/pids',
help=_('Location to store child pid files')),
+ cfg.BoolOpt('check_child_processes', default=False,
+ help=_("Periodically check child processes")),
+ cfg.StrOpt('check_child_processes_action', default='respawn',
+ choices=['respawn', 'exit'],
+ help=_('Action to be executed when a child process dies')),
+ cfg.IntOpt('check_child_processes_interval', default=60,
+ help=_('Interval between checks of child process liveness '
+ '(seconds)')),
]
+
cfg.CONF.register_opts(OPTS)
Note: The manager expects uuid to be in cmdline.
"""
def __init__(self, conf, uuid, root_helper='sudo',
- namespace=None, service=None, pids_path=None):
+ namespace=None, service=None, pids_path=None,
+ default_cmd_callback=None,
+ cmd_addl_env=None):
+
self.conf = conf
self.uuid = uuid
self.root_helper = root_helper
self.namespace = namespace
+ self.default_cmd_callback = default_cmd_callback
+ self.cmd_addl_env = cmd_addl_env
+ self.pids_path = pids_path or self.conf.external_pids
+
if service:
self.service_pid_fname = 'pid.' + service
+ self.service = service
else:
self.service_pid_fname = 'pid'
- self.pids_path = pids_path or self.conf.external_pids
+ self.service = 'default-service'
- def enable(self, cmd_callback, reload_cfg=False):
+ def enable(self, cmd_callback=None, reload_cfg=False):
if not self.active:
+ if not cmd_callback:
+ cmd_callback = self.default_cmd_callback
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
- ip_wrapper.netns.execute(cmd)
+ ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env)
elif reload_cfg:
self.reload_cfg()
return self.uuid in f.readline()
except IOError:
return False
+
+
+ServiceId = collections.namedtuple('ServiceId', ['uuid', 'service'])
+
+
+class ProcessMonitor(object):
+
+ def __init__(self, config, root_helper, resource_type, exit_handler):
+ """Handle multiple process managers and watch over all of them.
+
+ :param config: oslo config object with the agent configuration.
+ :type config: oslo.config.ConfigOpts
+ :param root_helper: root helper to be used with new ProcessManagers
+ :type root_helper: str
+ :param resource_type: can be dhcp, router, load_balancer, etc.
+ :type resource_type: str
+ :param exit_handler: function to execute when agent exit has to
+ be executed, it should take care of actual
+ exit
+ :type exit_hanlder: function
+ """
+ self._config = config
+ self._root_helper = root_helper
+ self._resource_type = resource_type
+ self._exit_handler = exit_handler
+
+ self._process_managers = {}
+
+ if self._config.check_child_processes:
+ self._spawn_checking_thread()
+
+ def enable(self, uuid, cmd_callback, namespace=None, service=None,
+ reload_cfg=False, cmd_addl_env=None):
+ """Creates a process and ensures that it is monitored.
+
+ It will create a new ProcessManager and tie it to the uuid/service.
+ """
+ process_manager = ProcessManager(conf=self._config,
+ uuid=uuid,
+ root_helper=self._root_helper,
+ namespace=namespace,
+ service=service,
+ default_cmd_callback=cmd_callback,
+ cmd_addl_env=cmd_addl_env)
+
+ process_manager.enable(reload_cfg=reload_cfg)
+ service_id = ServiceId(uuid, service)
+ self._process_managers[service_id] = process_manager
+
+ def disable(self, uuid, namespace=None, service=None):
+ """Disables the process and stops monitoring it."""
+ service_id = ServiceId(uuid, service)
+ process_manager = self._process_managers.pop(service_id, None)
+
+ # we could be trying to disable a process_manager which was
+ # started on a separate run of this agent, or during netns-cleanup
+ # therefore we won't know about such uuid and we need to
+ # build the process_manager to kill it
+ if not process_manager:
+ process_manager = ProcessManager(conf=self._config,
+ uuid=uuid,
+ root_helper=self._root_helper,
+ namespace=namespace,
+ service=service)
+
+ process_manager.disable()
+
+ def disable_all(self):
+ for service_id in self._process_managers.keys():
+ self.disable(uuid=service_id.uuid, service=service_id.service)
+
+ def get_process_manager(self, uuid, service=None):
+ """Returns a process manager for manipulation"""
+ service_id = ServiceId(uuid, service)
+ return self._process_managers.get(service_id)
+
+ def _get_process_manager_attribute(self, attribute, uuid, service=None):
+ process_manager = self.get_process_manager(uuid, service)
+ if process_manager:
+ return getattr(process_manager, attribute)
+ else:
+ return False
+
+ def is_active(self, uuid, service=None):
+ return self._get_process_manager_attribute('active', uuid, service)
+
+ def get_pid(self, uuid, service=None):
+ return self._get_process_manager_attribute('pid', uuid, service)
+
+ def _spawn_checking_thread(self):
+ eventlet.spawn(self._periodic_checking_thread)
+
+ @lockutils.synchronized("_check_child_processes")
+ def _check_child_processes(self):
+ for service_id in self._process_managers:
+ pm = self._process_managers.get(service_id)
+
+ if pm and not pm.active:
+ LOG.error(_LE("%(service)s for %(resource_type)s "
+ "with uuid %(uuid)s not found. "
+ "The process should not have died"),
+ {'service': pm.service,
+ 'resource_type': self._resource_type,
+ 'uuid': service_id.uuid})
+ self._execute_action(service_id)
+ eventlet.sleep(0)
+
+ def _periodic_checking_thread(self):
+ while True:
+ eventlet.sleep(self._config.check_child_processes_interval)
+ eventlet.spawn(self._check_child_processes)
+
+ def _execute_action(self, service_id):
+ action_function = getattr(
+ self, "_%s_action" % self._config.check_child_processes_action)
+ action_function(service_id)
+
+ def _respawn_action(self, service_id):
+ LOG.error(_LE("respawning %(service)s for uuid %(uuid)s"),
+ {'service': service_id.service,
+ 'uuid': service_id.uuid})
+ self._process_managers[service_id].enable()
+
+ def _exit_action(self, service_id):
+ LOG.error(_LE("Exiting agent as programmed in check_child_processes_"
+ "actions"))
+ self._exit_handler(service_id.uuid, service_id.service)
--- /dev/null
+# Copyright 2014 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+from oslo.config import cfg
+
+from neutron.agent.linux import daemon
+
+
+def main():
+
+ class SimpleDaemon(daemon.Daemon):
+ """The purpose of this daemon is to serve as an example, and also as
+ a dummy daemon, which can be invoked by functional testing, it
+ does nothing but setting the pid file, and staying detached in the
+ background.
+ """
+
+ def run(self):
+ while True:
+ eventlet.sleep(10)
+
+ eventlet.monkey_patch()
+ opts = [
+ cfg.StrOpt('uuid',
+ help=_('uuid provided from the command line '
+ 'so external_process can track us via /proc/'
+ 'cmdline interface.'),
+ required=True),
+ cfg.StrOpt('pid_file',
+ help=_('Location of pid file of this process.'),
+ required=True)
+ ]
+
+ cfg.CONF.register_cli_opts(opts)
+ # Don't get the default configuration file
+ cfg.CONF(project='neutron', default_config_files=[])
+ simple_daemon = SimpleDaemon(cfg.CONF.pid_file,
+ uuid=cfg.CONF.uuid)
+ simple_daemon.start()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+# Copyright 2014 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+from oslo.config import cfg
+from six import moves
+
+from neutron.agent.linux import external_process
+from neutron.tests.functional.agent.linux import simple_daemon
+from neutron.tests.functional import base
+
+
+UUID_FORMAT = "test-uuid-%d"
+
+
+class BaseTestProcessMonitor(base.BaseSudoTestCase):
+
+ def setUp(self):
+ super(BaseTestProcessMonitor, self).setUp()
+ self._exit_handler_called = False
+ cfg.CONF.set_override('check_child_processes', True)
+ cfg.CONF.set_override('check_child_processes_interval', 1)
+ self._child_processes = []
+ self._ext_processes = None
+ self.addCleanup(self.cleanup_spawned_children)
+
+ def create_child_processes_manager(self, action):
+ cfg.CONF.set_override('check_child_processes_action', action)
+ self._ext_processes = external_process.ProcessMonitor(
+ config=cfg.CONF,
+ root_helper=None,
+ resource_type='test',
+ exit_handler=self._exit_handler)
+
+ def _exit_handler(self, uuid, service):
+ self._exit_handler_called = True
+ self._exit_handler_params = (uuid, service)
+
+ def _make_cmdline_callback(self, uuid):
+ def _cmdline_callback(pidfile):
+ cmdline = ["python", simple_daemon.__file__,
+ "--uuid=%s" % uuid,
+ "--pid_file=%s" % pidfile]
+ return cmdline
+ return _cmdline_callback
+
+ def _spawn_n_children(self, n, service=None):
+ self._child_processes = []
+ for child_number in moves.xrange(n):
+ uuid = self._child_uuid(child_number)
+ _callback = self._make_cmdline_callback(uuid)
+ self._ext_processes.enable(uuid=uuid,
+ cmd_callback=_callback,
+ service=service)
+
+ pm = self._ext_processes.get_process_manager(uuid, service)
+ self._child_processes.append(pm)
+
+ @staticmethod
+ def _child_uuid(child_number):
+ return UUID_FORMAT % child_number
+
+ def _kill_last_child(self):
+ self._child_processes[-1].disable()
+
+ def spawn_child_processes_and_kill_last(self, service=None, number=2):
+ self._spawn_n_children(number, service)
+ self._kill_last_child()
+ self.assertFalse(self._child_processes[-1].active)
+
+ def wait_for_all_childs_respawned(self):
+ def all_childs_active():
+ return all(pm.active for pm in self._child_processes)
+
+ self._wait_for_condition(all_childs_active)
+
+ def _wait_for_condition(self, exit_condition, extra_time=5):
+ # we need to allow extra_time for the check process to happen
+ # and properly execute action over the gone processes under
+ # high load conditions
+ max_wait_time = cfg.CONF.check_child_processes_interval + extra_time
+ with self.assert_max_execution_time(max_wait_time):
+ while not exit_condition():
+ eventlet.sleep(0.01)
+
+ def cleanup_spawned_children(self):
+ if self._ext_processes:
+ self._ext_processes.disable_all()
+
+
+class TestProcessMonitor(BaseTestProcessMonitor):
+
+ def test_respawn_handler(self):
+ self.create_child_processes_manager('respawn')
+ self.spawn_child_processes_and_kill_last()
+ self.wait_for_all_childs_respawned()
--- /dev/null
+# Copyright 2014 Red Hat Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import mock
+
+from neutron.agent.linux import external_process
+from neutron.tests import base
+
+TEST_UUID = 'test-uuid'
+TEST_SERVICE1 = 'testsvc'
+TEST_PID = 1234
+
+
+class BaseTestProcessMonitor(base.BaseTestCase):
+
+ def setUp(self):
+ super(BaseTestProcessMonitor, self).setUp()
+ self.pm_patch = mock.patch("neutron.agent.linux.external_process."
+ "ProcessManager", side_effect=mock.Mock)
+ self.pmanager = self.pm_patch.start()
+
+ self.log_patch = mock.patch("neutron.agent.linux.external_process."
+ "LOG.error")
+ self.error_log = self.log_patch.start()
+
+ self.spawn_patch = mock.patch("eventlet.spawn")
+ self.eventlent_spawn = self.spawn_patch.start()
+
+ # create a default process monitor
+ self.create_child_process_monitor('respawn')
+
+ def create_child_process_monitor(self, action):
+ self.exit_handler = mock.Mock()
+ conf = mock.Mock()
+ conf.check_child_processes_action = action
+ conf.check_child_processes = True
+ self.pmonitor = external_process.ProcessMonitor(
+ config=conf,
+ root_helper=None,
+ resource_type='test',
+ exit_handler=self.exit_handler)
+
+ def get_monitored_process_manager(self, uuid, service=None):
+ self.pmonitor.enable(uuid=uuid, service=service, cmd_callback=None)
+ return self.pmonitor.get_process_manager(uuid, service)
+
+
+class TestProcessMonitor(BaseTestProcessMonitor):
+
+ def test_error_logged(self):
+ pm = self.get_monitored_process_manager(TEST_UUID)
+ pm.active = False
+ self.pmonitor._check_child_processes()
+ self.assertTrue(self.error_log.called)
+
+ def test_exit_handler(self):
+ self.create_child_process_monitor('exit')
+ pm = self.get_monitored_process_manager(TEST_UUID)
+ pm.active = False
+ self.pmonitor._check_child_processes()
+ self.exit_handler.assert_called_once_with(TEST_UUID, None)
+
+ def test_different_service_types(self):
+ pm_none = self.get_monitored_process_manager(TEST_UUID)
+ pm_svc1 = self.get_monitored_process_manager(TEST_UUID, TEST_SERVICE1)
+ self.assertNotEqual(pm_none, pm_svc1)
+
+ def test_active_method(self, service=None):
+ pm = self.get_monitored_process_manager(TEST_UUID, service)
+ pm.active = False
+ self.assertFalse(self.pmonitor.is_active(TEST_UUID, service))
+ pm.active = True
+ self.assertTrue(self.pmonitor.is_active(TEST_UUID, service))
+
+ def test_active_method_with_service(self):
+ self.test_active_method(TEST_SERVICE1)
+
+ def test_pid_method(self, service=None):
+ pm = self.get_monitored_process_manager(TEST_UUID, service)
+ pm.pid = TEST_PID
+ self.assertEqual(TEST_PID, self.pmonitor.get_pid(TEST_UUID, service))
+
+ def test_pid_method_with_service(self):
+ self.test_pid_method(TEST_PID)
+
+ def test_pid_method_unknown_uuid(self):
+ self.assertFalse(self.pmonitor.get_pid('bad-uuid'))
mock.ANY,
'--debug',
('--log-file=neutron-ns-metadata-proxy-%s.log' %
- fake_meta_network.id)])
+ fake_meta_network.id)], addl_env=None)
])
finally:
self.external_process_p.start()
'--debug',
'--log-file=neutron-ns-metadata-proxy-%s.log' %
router_id
- ])
+ ], addl_env=None)
])
finally:
self.external_process_p.start()
name.assert_called_once_with(ensure_pids_dir=True)
ip_lib.assert_has_calls([
mock.call.IPWrapper('sudo', 'ns'),
- mock.call.IPWrapper().netns.execute(['the', 'cmd'])]
- )
+ mock.call.IPWrapper().netns.execute(['the', 'cmd'],
+ addl_env=None)])
def test_enable_with_namespace_process_active(self):
callback = mock.Mock()