]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Implements the ProcessMonitor in the l3_agent
authorMiguel Angel Ajo <mangelajo@redhat.com>
Mon, 18 Aug 2014 11:00:58 +0000 (13:00 +0200)
committerMiguel Angel Ajo <mangelajo@redhat.com>
Thu, 29 Jan 2015 20:43:12 +0000 (20:43 +0000)
The ProcessMonitor class will watch over spawned external processes,
taking the administrator configured action in the case of any
of the external processes dying unexpectedly.

It covers both the neutron-ns-metadata-proxy for non-ha routers
and the IPv6 radvd external processes. Keepalived +
neutron-ns-metadata-proxy needs to be covered in a second follow up
patch when neutron-ns-metadata-proxy is handled by the l3-agent
(instead keepalived) in the ha-routers.

Implements: blueprint agent-child-processes-status

Change-Id: Id6cc4786d837b96c61429d51485bc86ae37872cb

neutron/agent/l3/agent.py
neutron/agent/l3/ha.py
neutron/agent/linux/external_process.py
neutron/agent/linux/ra.py
neutron/agent/linux/utils.py
neutron/agent/metadata/driver.py
neutron/tests/functional/agent/test_l3_agent.py
neutron/tests/unit/test_l3_agent.py
neutron/tests/unit/test_linux_external_process.py

index 102e57edf5a1d659007a267cd16d80c622c07539..db0f578b3a6db520e5470a46255aff36455ec63e 100644 (file)
@@ -29,6 +29,7 @@ from neutron.agent.l3 import ha
 from neutron.agent.l3 import ha_router
 from neutron.agent.l3 import legacy_router
 from neutron.agent.l3 import router_processing_queue as queue
+from neutron.agent.linux import external_process
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import ra
 from neutron.agent.metadata import driver as metadata_driver
@@ -148,6 +149,11 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
 
         self._check_config_params()
 
+        self.process_monitor = external_process.ProcessMonitor(
+            config=self.conf,
+            root_helper=self.root_helper,
+            resource_type='router')
+
         try:
             self.driver = importutils.import_object(
                 self.conf.interface_driver,
@@ -281,7 +287,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
 
     def _destroy_router_namespace(self, ns):
         router_id = self.get_router_id(ns)
-        ra.disable_ipv6_ra(router_id, ns, self.root_helper)
+        ra.disable_ipv6_ra(router_id, self.process_monitor)
         ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=ns)
         for d in ns_ip.get_devices(exclude_loopback=True):
             if d.name.startswith(INTERNAL_DEV_PREFIX):
@@ -450,7 +456,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
                               ri.ns_name,
                               internal_ports,
                               self.get_internal_device_name,
-                              self.root_helper)
+                              self.process_monitor)
 
         existing_devices = self._get_existing_devices(ri)
         current_internal_devs = set([n for n in existing_devices
index 335678c8ade27a81ef572fe9676aecdf1e519464..9e902d4a38162618b0113bd917e7d91fcda7001f 100644 (file)
@@ -147,12 +147,15 @@ class AgentMixin(object):
         callback = (
             metadata_driver.MetadataDriver._get_metadata_proxy_callback(
                 ri.router_id, self.conf))
+        # TODO(mangelajo): use the process monitor in keepalived when
+        #                  keepalived stops killing/starting metadata
+        #                  proxy on its own
         pm = (
             metadata_driver.MetadataDriver.
             _get_metadata_proxy_process_manager(ri.router_id,
                                                 ri.ns_name,
                                                 self.conf))
-        pid = pm.get_pid_file_name(ensure_pids_dir=True)
+        pid = pm.get_pid_file_name()
         ri.keepalived_manager.add_notifier(
             callback(pid), 'master', ri.ha_vr_id)
         for state in ('backup', 'fault'):
index f8c5220a9d94534ae76ef5e003161a7a62aadcf7..341a7503d30e62daa36db5d4f0c2ed593be98ecd 100644 (file)
@@ -66,11 +66,13 @@ class ProcessManager(object):
             self.service_pid_fname = 'pid'
             self.service = 'default-service'
 
+        utils.ensure_dir(os.path.dirname(self.get_pid_file_name()))
+
     def enable(self, cmd_callback=None, reload_cfg=False):
         if not self.active:
             if not cmd_callback:
                 cmd_callback = self.default_cmd_callback
-            cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
+            cmd = cmd_callback(self.get_pid_file_name())
 
             ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
             ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env)
@@ -96,17 +98,14 @@ class ProcessManager(object):
         else:
             LOG.debug('No process started for %s', self.uuid)
 
-    def get_pid_file_name(self, ensure_pids_dir=False):
+    def get_pid_file_name(self):
         """Returns the file name for a given kind of config file."""
         if self.pid_file:
-            if ensure_pids_dir:
-                utils.ensure_dir(os.path.dirname(self.pid_file))
             return self.pid_file
         else:
             return utils.get_conf_file_name(self.pids_path,
                                             self.uuid,
-                                            self.service_pid_fname,
-                                            ensure_pids_dir)
+                                            self.service_pid_fname)
 
     @property
     def pid(self):
@@ -223,6 +222,11 @@ class ProcessMonitor(object):
             service=service,
             pid_file=pid_file).pid
 
+    def get_pid_file_name(self, uuid, service=None):
+        return self._ensure_process_manager(
+            uuid=uuid,
+            service=service).get_pid_file_name()
+
     def _ensure_process_manager(self, uuid, cmd_callback=None,
                                 namespace=None, service=None,
                                 cmd_addl_env=None,
index af721cca1b4d64fc80b60d444b93567d941821a8..4044ca0ab0e88752489a9b42c25a7d880cbb2512 100644 (file)
@@ -18,12 +18,14 @@ import netaddr
 from oslo.config import cfg
 import six
 
-from neutron.agent.linux import external_process
 from neutron.agent.linux import utils
 from neutron.common import constants
 from neutron.openstack.common import log as logging
 
 
+RADVD_SERVICE_NAME = 'radvd'
+RADVD_SERVICE_CMD = 'radvd'
+
 LOG = logging.getLogger(__name__)
 
 OPTS = [
@@ -76,49 +78,43 @@ def _generate_radvd_conf(router_id, router_ports, dev_name_helper):
     return radvd_conf
 
 
-def _spawn_radvd(router_id, radvd_conf, router_ns, root_helper):
+def _spawn_radvd(router_id, radvd_conf, router_ns, process_monitor):
     def callback(pid_file):
         # we need to use -m syslog and f.e. not -m stderr (the default)
         # or -m stderr_syslog so that radvd 2.0+ will close stderr and
         # exit after daemonization; otherwise, the current thread will
         # be locked waiting for result from radvd that won't ever come
         # until the process dies
-        radvd_cmd = ['radvd',
+        radvd_cmd = [RADVD_SERVICE_CMD,
                      '-C', '%s' % radvd_conf,
                      '-p', '%s' % pid_file,
                      '-m', 'syslog']
         return radvd_cmd
 
-    radvd = external_process.ProcessManager(cfg.CONF,
-                                            router_id,
-                                            root_helper,
-                                            router_ns,
-                                            'radvd')
-    radvd.enable(callback, True)
+    process_monitor.enable(uuid=router_id,
+                           cmd_callback=callback,
+                           namespace=router_ns,
+                           service=RADVD_SERVICE_NAME,
+                           reload_cfg=True)
     LOG.debug("radvd enabled for router %s", router_id)
 
 
 def enable_ipv6_ra(router_id, router_ns, router_ports,
-                   dev_name_helper, root_helper):
+                   dev_name_helper, process_monitor):
     for p in router_ports:
         if netaddr.IPNetwork(p['subnet']['cidr']).version == 6:
             break
     else:
         # Kill the daemon if it's running
-        disable_ipv6_ra(router_id, router_ns, root_helper)
+        disable_ipv6_ra(router_id, process_monitor)
         return
 
     LOG.debug("Enable IPv6 RA for router %s", router_id)
     radvd_conf = _generate_radvd_conf(router_id, router_ports, dev_name_helper)
-    _spawn_radvd(router_id, radvd_conf, router_ns, root_helper)
+    _spawn_radvd(router_id, radvd_conf, router_ns, process_monitor)
 
 
-def disable_ipv6_ra(router_id, router_ns, root_helper):
-    radvd = external_process.ProcessManager(cfg.CONF,
-                                            router_id,
-                                            root_helper,
-                                            router_ns,
-                                            'radvd')
-    radvd.disable()
+def disable_ipv6_ra(router_id, process_monitor):
+    process_monitor.disable(router_id, service=RADVD_SERVICE_NAME)
     utils.remove_conf_files(cfg.CONF.ra_confs, router_id)
     LOG.debug("radvd disabled for router %s", router_id)
index 2d7aa616f8dccd30305b0ab93c677ecfba73fddb..17543acbf67a58b94789e9790025f6cbdf0e6334 100644 (file)
@@ -141,6 +141,8 @@ def ensure_dir(dir_path):
 
 
 def _get_conf_base(cfg_root, uuid, ensure_conf_dir):
+    #TODO(mangelajo): separate responsibilities here, ensure_conf_dir
+    #                 should be a separate function
     conf_dir = os.path.abspath(os.path.normpath(cfg_root))
     conf_base = os.path.join(conf_dir, uuid)
     if ensure_conf_dir:
index da5b3eb510c7f4173388df08f63aaa8bdf7801c6..650760cc4a70de93490a89df94f6a0a98644f8bb 100644 (file)
@@ -36,9 +36,8 @@ class MetadataDriver(advanced_service.AdvancedService):
         router.iptables_manager.apply()
 
         if not router.is_ha:
-            self._spawn_metadata_proxy(router.router_id,
-                                       router.ns_name,
-                                       self.l3_agent.conf)
+            self._spawn_monitored_metadata_proxy(router.router_id,
+                                                 router.ns_name)
 
     def before_router_removed(self, router):
         for c, r in self.metadata_filter_rules(self.metadata_port):
@@ -47,9 +46,8 @@ class MetadataDriver(advanced_service.AdvancedService):
             router.iptables_manager.ipv4['nat'].remove_rule(c, r)
         router.iptables_manager.apply()
 
-        self._destroy_metadata_proxy(router.router['id'],
-                                     router.ns_name,
-                                     self.l3_agent.conf)
+        self._destroy_monitored_metadata_proxy(router.router['id'],
+                                               router.ns_name)
 
     @classmethod
     def metadata_filter_rules(cls, port):
@@ -89,6 +87,17 @@ class MetadataDriver(advanced_service.AdvancedService):
 
         return callback
 
+    def _spawn_monitored_metadata_proxy(self, router_id, ns_name):
+        callback = self._get_metadata_proxy_callback(
+            router_id, self.l3_agent.conf)
+        self.l3_agent.process_monitor.enable(router_id, callback, ns_name)
+
+    def _destroy_monitored_metadata_proxy(self, router_id, ns_name):
+        self.l3_agent.process_monitor.disable(router_id, ns_name)
+
+    # TODO(mangelajo): remove the unmonitored _get_*_process_manager,
+    #                  _spawn_* and _destroy* when keepalived stops
+    #                  spawning and killing proxies on its own.
     @classmethod
     def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf):
         return external_process.ProcessManager(
index 7ce844a2131b8cc20bf471844468d0228f2c7df6..2dcc7abfec0b84377e3db47592c3a1026588d8d4 100755 (executable)
@@ -63,6 +63,7 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
         config.register_cli_opts(logging.logging_cli_opts)
         config.register_opts(logging.generic_log_opts)
         config.register_opts(logging.log_opts)
+        agent_config.register_process_monitor_opts(config)
         return config
 
     def _configure_agent(self, host):
index 7a1a067e8031e872336859378420e99cc75440be..c5a56eed7cdb07cab104169b75b2dea30bced0da 100644 (file)
@@ -19,7 +19,6 @@ import eventlet
 
 import mock
 import netaddr
-from oslo.config import cfg
 from oslo import messaging
 from testtools import matchers
 
@@ -31,6 +30,7 @@ from neutron.agent.l3 import dvr_router
 from neutron.agent.l3 import ha
 from neutron.agent.l3 import link_local_allocator as lla
 from neutron.agent.l3 import router_info as l3router
+from neutron.agent.linux import external_process
 from neutron.agent.linux import interface
 from neutron.agent.linux import ra
 from neutron.agent.metadata import driver as metadata_driver
@@ -180,7 +180,9 @@ class TestBasicRouterOperations(base.BaseTestCase):
         agent_config.register_interface_driver_opts_helper(self.conf)
         agent_config.register_use_namespaces_opts_helper(self.conf)
         agent_config.register_root_helper(self.conf)
+        agent_config.register_process_monitor_opts(self.conf)
         self.conf.register_opts(interface.OPTS)
+        self.conf.register_opts(external_process.OPTS)
         self.conf.set_override('router_id', 'fake_id')
         self.conf.set_override('interface_driver',
                                'neutron.agent.linux.interface.NullDriver')
@@ -1276,23 +1278,34 @@ class TestBasicRouterOperations(base.BaseTestCase):
         self.assertFalse(nat_rules_delta)
         return ri
 
-    def _expected_call_lookup_ri_process(self, ri, process):
+    def _expected_call_lookup_ri_process_enabled(self, ri, process):
         """Expected call if a process is looked up in a router instance."""
-        return [mock.call(cfg.CONF,
-                          ri.router['id'],
-                          self.conf.root_helper,
-                          ri.ns_name,
-                          process)]
+        return [mock.call(uuid=ri.router['id'],
+                          service=process,
+                          default_cmd_callback=mock.ANY,
+                          namespace=ri.ns_name,
+                          root_helper=self.conf.root_helper,
+                          conf=self.conf,
+                          pid_file=None,
+                          cmd_addl_env=None)]
+
+    def _expected_call_lookup_ri_process_disabled(self, ri, process):
+        """Expected call if a process is looked up in a router instance."""
+        # The ProcessManager does already exist, and it's found via
+        # ProcessMonitor lookup _ensure_process_manager
+        return [mock.call().__nonzero__()]
 
     def _assert_ri_process_enabled(self, ri, process):
         """Verify that process was enabled for a router instance."""
-        expected_calls = self._expected_call_lookup_ri_process(ri, process)
-        expected_calls.append(mock.call().enable(mock.ANY, True))
+        expected_calls = self._expected_call_lookup_ri_process_enabled(
+            ri, process)
+        expected_calls.append(mock.call().enable(reload_cfg=True))
         self.assertEqual(expected_calls, self.external_process.mock_calls)
 
     def _assert_ri_process_disabled(self, ri, process):
         """Verify that process was disabled for a router instance."""
-        expected_calls = self._expected_call_lookup_ri_process(ri, process)
+        expected_calls = self._expected_call_lookup_ri_process_disabled(
+            ri, process)
         expected_calls.append(mock.call().disable())
         self.assertEqual(expected_calls, self.external_process.mock_calls)
 
@@ -1677,20 +1690,18 @@ class TestBasicRouterOperations(base.BaseTestCase):
                   'distributed': False}
         driver = metadata_driver.MetadataDriver
         with mock.patch.object(
-            driver, '_destroy_metadata_proxy') as destroy_proxy:
+            driver, '_destroy_monitored_metadata_proxy') as destroy_proxy:
             with mock.patch.object(
-                driver, '_spawn_metadata_proxy') as spawn_proxy:
+                driver, '_spawn_monitored_metadata_proxy') as spawn_proxy:
                 agent._process_added_router(router)
                 if enableflag:
                     spawn_proxy.assert_called_with(router_id,
-                                                   mock.ANY,
                                                    mock.ANY)
                 else:
                     self.assertFalse(spawn_proxy.call_count)
                 agent._router_removed(router_id)
                 if enableflag:
                     destroy_proxy.assert_called_with(router_id,
-                                                     mock.ANY,
                                                      mock.ANY)
                 else:
                     self.assertFalse(destroy_proxy.call_count)
@@ -2195,15 +2206,17 @@ class TestBasicRouterOperations(base.BaseTestCase):
         self.external_process_p.stop()
         self.ip_cls_p.stop()
 
+        ensure_dir = 'neutron.agent.linux.utils.ensure_dir'
         get_pid_file_name = ('neutron.agent.linux.external_process.'
                              'ProcessManager.get_pid_file_name')
         with mock.patch('neutron.agent.linux.utils.execute') as execute:
             with mock.patch(get_pid_file_name) as get_pid:
-                get_pid.return_value = pidfile
-                ra._spawn_radvd(router['id'],
-                                conffile,
-                                agent.get_ns_name(router['id']),
-                                self.conf.root_helper)
+                with mock.patch(ensure_dir) as ensure_dir:
+                    get_pid.return_value = pidfile
+                    ra._spawn_radvd(router['id'],
+                                    conffile,
+                                    agent.get_ns_name(router['id']),
+                                    agent.process_monitor)
             cmd = execute.call_args[0][0]
 
         self.assertIn('radvd', cmd)
index 1adb06900d65953491797b1d1e2edbd6eb6985ad..f208c362b4bd544450a3330f91b7e629ae7a71fd 100644 (file)
@@ -13,6 +13,7 @@
 #    under the License.
 
 import mock
+import os.path
 
 from neutron.agent.linux import external_process as ep
 from neutron.tests import base
@@ -25,10 +26,16 @@ class TestProcessManager(base.BaseTestCase):
         self.execute = self.execute_p.start()
         self.delete_if_exists = mock.patch(
             'neutron.openstack.common.fileutils.delete_if_exists').start()
+        self.makedirs = mock.patch('os.makedirs').start()
 
         self.conf = mock.Mock()
         self.conf.external_pids = '/var/path'
 
+    def test_processmanager_ensures_pid_dir(self):
+        pid_file = os.path.join(self.conf.external_pids, 'pid')
+        ep.ProcessManager(self.conf, 'uuid', pid_file=pid_file)
+        self.makedirs.assert_called_once_with(self.conf.external_pids, 0o755)
+
     def test_enable_no_namespace(self):
         callback = mock.Mock()
         callback.return_value = ['the', 'cmd']
@@ -41,7 +48,6 @@ class TestProcessManager(base.BaseTestCase):
                 manager = ep.ProcessManager(self.conf, 'uuid')
                 manager.enable(callback)
                 callback.assert_called_once_with('pidfile')
-                name.assert_called_once_with(ensure_pids_dir=True)
                 self.execute.assert_called_once_with(['the', 'cmd'],
                                                      root_helper='sudo',
                                                      check_exit_code=True,
@@ -60,7 +66,6 @@ class TestProcessManager(base.BaseTestCase):
                 with mock.patch.object(ep, 'ip_lib') as ip_lib:
                     manager.enable(callback)
                     callback.assert_called_once_with('pidfile')
-                    name.assert_called_once_with(ensure_pids_dir=True)
                     ip_lib.assert_has_calls([
                         mock.call.IPWrapper('sudo', 'ns'),
                         mock.call.IPWrapper().netns.execute(['the', 'cmd'],
@@ -121,29 +126,10 @@ class TestProcessManager(base.BaseTestCase):
                     manager.disable()
                     debug.assert_called_once_with(mock.ANY, mock.ANY)
 
-    def test_get_pid_file_name_existing(self):
-        with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
-            isdir.return_value = True
-            manager = ep.ProcessManager(self.conf, 'uuid')
-            retval = manager.get_pid_file_name(ensure_pids_dir=True)
-            self.assertEqual(retval, '/var/path/uuid.pid')
-
-    def test_get_pid_file_name_not_existing(self):
-        with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
-            with mock.patch.object(ep.utils.os, 'makedirs') as makedirs:
-                isdir.return_value = False
-                manager = ep.ProcessManager(self.conf, 'uuid')
-                retval = manager.get_pid_file_name(ensure_pids_dir=True)
-                self.assertEqual(retval, '/var/path/uuid.pid')
-                makedirs.assert_called_once_with('/var/path', 0o755)
-
     def test_get_pid_file_name_default(self):
-        with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
-            isdir.return_value = True
-            manager = ep.ProcessManager(self.conf, 'uuid')
-            retval = manager.get_pid_file_name(ensure_pids_dir=False)
-            self.assertEqual(retval, '/var/path/uuid.pid')
-            self.assertFalse(isdir.called)
+        manager = ep.ProcessManager(self.conf, 'uuid')
+        retval = manager.get_pid_file_name()
+        self.assertEqual(retval, '/var/path/uuid.pid')
 
     def test_pid(self):
         with mock.patch('__builtin__.open') as mock_open: