]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Refactor the ProcessMonitor API
authorMiguel Angel Ajo <mangelajo@redhat.com>
Tue, 10 Feb 2015 12:59:03 +0000 (12:59 +0000)
committerMiguel Angel Ajo <mangelajo@redhat.com>
Fri, 6 Mar 2015 11:09:21 +0000 (11:09 +0000)
Refactor the ProcessMonitor API to reduce coupling to
ProcessManager, and stop replicating the whole API and
behavior of ProcessManager.

We introduced an abstract MonitoredProcess class to reduce
coupling, and allow other kinds of external processes
to be monitored too.

Partially Implements: blueprint agent-child-processes-status
Co-Authored-By: Mike Kolesnik <mkolesni@redhat.com>
Change-Id: I0da6071037f9728cc20403324e36c32116bcf00d

12 files changed:
neutron/agent/dhcp/agent.py
neutron/agent/linux/dhcp.py
neutron/agent/linux/external_process.py
neutron/agent/linux/ra.py
neutron/agent/metadata/driver.py
neutron/cmd/netns_cleanup.py
neutron/tests/functional/agent/linux/test_process_monitor.py
neutron/tests/unit/agent/linux/test_process_monitor.py
neutron/tests/unit/test_dhcp_agent.py
neutron/tests/unit/test_l3_agent.py
neutron/tests/unit/test_linux_dhcp.py
neutron/tests/unit/test_netns_cleanup.py

index 1cf7baae4252c69f38a00ef01e0ded1872afba24..8f0647c8992f817001d24d03aaaad849cb0331e6 100644 (file)
@@ -361,7 +361,7 @@ class DhcpAgent(manager.Manager):
 
     def disable_isolated_metadata_proxy(self, network):
         metadata_driver.MetadataDriver.destroy_monitored_metadata_proxy(
-            self._process_monitor, network.id, network.namespace)
+            self._process_monitor, network.id, network.namespace, self.conf)
 
 
 class DhcpPluginApi(object):
index 6b8d034d30febb5e69651f9b89339910596eca96..3894676b66d305961e870d103d20566507084666 100644 (file)
@@ -23,13 +23,14 @@ import netaddr
 from oslo_utils import importutils
 import six
 
+from neutron.agent.linux import external_process
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
 from neutron.common import constants
 from neutron.common import exceptions
 from neutron.common import ipv6_utils
 from neutron.common import utils as commonutils
-from neutron.i18n import _LE, _LI
+from neutron.i18n import _LE, _LI, _LW
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import uuidutils
 
@@ -206,31 +207,37 @@ class DhcpLocalProcess(DhcpBase):
             self.interface_name = interface_name
             self.spawn_process()
 
+    def _get_process_manager(self, cmd_callback=None):
+        return external_process.ProcessManager(
+            conf=self.conf,
+            uuid=self.network.id,
+            namespace=self.network.namespace,
+            default_cmd_callback=cmd_callback,
+            pid_file=self.get_conf_file_name('pid'))
+
     def disable(self, retain_port=False):
         """Disable DHCP for this network by killing the local process."""
-        pid_filename = self.get_conf_file_name('pid')
-
-        pid = self.process_monitor.get_pid(uuid=self.network.id,
-                                           service=DNSMASQ_SERVICE_NAME,
-                                           pid_file=pid_filename)
-
-        self.process_monitor.disable(uuid=self.network.id,
-                                     namespace=self.network.namespace,
-                                     service=DNSMASQ_SERVICE_NAME,
-                                     pid_file=pid_filename)
-        if pid and not retain_port:
-            self.device_manager.destroy(self.network, self.interface_name)
+        self.process_monitor.unregister(self.network.id, DNSMASQ_SERVICE_NAME)
+        self._get_process_manager().disable()
 
         self._remove_config_files()
-
         if not retain_port:
-            if self.conf.dhcp_delete_namespaces and self.network.namespace:
-                ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
-                try:
-                    ns_ip.netns.delete(self.network.namespace)
-                except RuntimeError:
-                    LOG.exception(_LE('Failed trying to delete namespace: %s'),
-                                  self.network.namespace)
+            self._destroy_namespace_and_port()
+
+    def _destroy_namespace_and_port(self):
+        try:
+            self.device_manager.destroy(self.network, self.interface_name)
+        except RuntimeError:
+            LOG.warning(_LW('Failed trying to delete interface: %s'),
+                        self.interface_name)
+
+        if self.conf.dhcp_delete_namespaces and self.network.namespace:
+            ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
+            try:
+                ns_ip.netns.delete(self.network.namespace)
+            except RuntimeError:
+                LOG.warning(_LW('Failed trying to delete namespace: %s'),
+                            self.network.namespace)
 
     def _get_value_from_conf_file(self, kind, converter=None):
         """A helper function to read a value from one of the state files."""
@@ -260,10 +267,7 @@ class DhcpLocalProcess(DhcpBase):
 
     @property
     def active(self):
-        pid_filename = self.get_conf_file_name('pid')
-        return self.process_monitor.is_active(self.network.id,
-                                              DNSMASQ_SERVICE_NAME,
-                                              pid_file=pid_filename)
+        return self._get_process_manager().active
 
     @abc.abstractmethod
     def spawn_process(self):
@@ -383,15 +387,14 @@ class Dnsmasq(DhcpLocalProcess):
 
         self._output_config_files()
 
-        pid_filename = self.get_conf_file_name('pid')
+        pm = self._get_process_manager(
+            cmd_callback=self._build_cmdline_callback)
 
-        self.process_monitor.enable(
-            uuid=self.network.id,
-            cmd_callback=self._build_cmdline_callback,
-            namespace=self.network.namespace,
-            service=DNSMASQ_SERVICE_NAME,
-            reload_cfg=reload_with_HUP,
-            pid_file=pid_filename)
+        pm.enable(reload_cfg=reload_with_HUP)
+
+        self.process_monitor.register(uuid=self.network.id,
+                                      service_name=DNSMASQ_SERVICE_NAME,
+                                      monitored_process=pm)
 
     def _release_lease(self, mac_address, ip):
         """Release a DHCP lease."""
index 4159ac8d51b31c7b3df9209c1105a010cab988f8..abed5ba42d477dbfe0b3fffef4f2d44878658bd1 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import abc
 import collections
 import os.path
+import six
 
 import eventlet
 from oslo_concurrency import lockutils
@@ -40,7 +42,18 @@ cfg.CONF.register_opts(OPTS)
 agent_cfg.register_process_monitor_opts(cfg.CONF)
 
 
-class ProcessManager(object):
+@six.add_metaclass(abc.ABCMeta)
+class MonitoredProcess(object):
+    @abc.abstractproperty
+    def active(self):
+        """Boolean representing the running state of the process."""
+
+    @abc.abstractmethod
+    def enable(self):
+        """Enable the service, or respawn the process."""
+
+
+class ProcessManager(MonitoredProcess):
     """An external process manager for Neutron spawned processes.
 
     Note: The manager expects uuid to be in cmdline.
@@ -140,118 +153,57 @@ class ProcessMonitor(object):
         self._config = config
         self._resource_type = resource_type
 
-        self._process_managers = {}
+        self._monitored_processes = {}
 
         if self._config.AGENT.check_child_processes_interval:
             self._spawn_checking_thread()
 
-    def enable(self, uuid, cmd_callback, namespace=None, service=None,
-               reload_cfg=False, cmd_addl_env=None, pid_file=None):
-        """Creates a process manager and ensures that it is monitored.
-
-        It will create a new ProcessManager and tie it to the uuid/service
-        with the new settings, replacing the old one if it existed already.
-
-        :param uuid: UUID of the resource this process will serve for.
-        :param cmd_callback: Callback function that receives a pid_file
-                             location and returns a list with the command
-                             and arguments.
-        :param namespace: network namespace to run the process in, if
-                          necessary.
-        :param service: a logical name for the service this process provides,
-                        it will extend the pid file like pid.%(service)s.
-        :param reload_cfg: if the process is active send a HUP signal
-                           for configuration reload, otherwise spawn it
-                           normally.
-        :param cmd_addl_env: additional environment variables for the
-                             spawned process.
-        :param pid_file: the pid file to store the pid of the external
-                         process. If not provided, a default will be used.
+    def register(self, uuid, service_name, monitored_process):
+        """Start monitoring a process.
+
+        The given monitored_process will be tied to it's uuid+service_name
+        replacing the old one if it existed already.
+
+        The monitored_process should be enabled before registration,
+        otherwise ProcessMonitor could try to enable the process itself,
+        which could lead to double enable and if unlucky enough, two processes
+        running, and also errors in the logs.
+
+        :param uuid: An ID of the resource for which the process is running.
+        :param service_name: A logical service name for this process monitor,
+                             so the same uuid provided via process manager
+                             can reference several different services.
+        :param monitored_process: MonitoredProcess we want to monitor.
         """
-        process_manager = self._create_process_manager(
-            uuid=uuid,
-            cmd_callback=cmd_callback,
-            namespace=namespace,
-            service=service,
-            cmd_addl_env=cmd_addl_env,
-            pid_file=pid_file)
-
-        process_manager.enable(reload_cfg=reload_cfg)
-        service_id = ServiceId(uuid, service)
-
-        # replace the old process manager with the new one
-        self._process_managers[service_id] = process_manager
-
-    def disable(self, uuid, namespace=None, service=None,
-                pid_file=None):
-        """Disables the process and stops monitoring it."""
-        service_id = ServiceId(uuid, service)
-
-        process_manager = self._ensure_process_manager(
-            uuid=uuid,
-            service=service,
-            pid_file=pid_file,
-            namespace=namespace)
-        self._process_managers.pop(service_id, None)
-
-        process_manager.disable()
-
-    def disable_all(self):
-        for service_id in self._process_managers.keys():
-            self.disable(uuid=service_id.uuid, service=service_id.service)
-
-    def get_process_manager(self, uuid, service=None):
-        """Returns a process manager for manipulation"""
-        service_id = ServiceId(uuid, service)
-        return self._process_managers.get(service_id)
-
-    def is_active(self, uuid, service=None, pid_file=None):
-        return self._ensure_process_manager(
-            uuid=uuid,
-            service=service,
-            pid_file=pid_file).active
-
-    def get_pid(self, uuid, service=None, pid_file=None):
-        return self._ensure_process_manager(
-            uuid=uuid,
-            service=service,
-            pid_file=pid_file).pid
-
-    def get_pid_file_name(self, uuid, service=None):
-        return self._ensure_process_manager(
-            uuid=uuid,
-            service=service).get_pid_file_name()
-
-    def _ensure_process_manager(self, uuid, cmd_callback=None,
-                                namespace=None, service=None,
-                                cmd_addl_env=None,
-                                pid_file=None,
-                                ):
-
-        process_manager = self.get_process_manager(uuid, service)
-        if not process_manager:
-            # if the process existed in a different run of the agent
-            # provide one, generally for pid / active evaluation
-            process_manager = self._create_process_manager(
-                uuid=uuid,
-                cmd_callback=cmd_callback,
-                namespace=namespace,
-                service=service,
-                cmd_addl_env=cmd_addl_env,
-                pid_file=pid_file)
-        return process_manager
-
-    def _create_process_manager(self, uuid, cmd_callback, namespace, service,
-                                cmd_addl_env, pid_file):
-        return ProcessManager(conf=self._config,
-                              uuid=uuid,
-                              namespace=namespace,
-                              service=service,
-                              default_cmd_callback=cmd_callback,
-                              cmd_addl_env=cmd_addl_env,
-                              pid_file=pid_file)
+
+        service_id = ServiceId(uuid, service_name)
+        self._monitored_processes[service_id] = monitored_process
+
+    def unregister(self, uuid, service_name):
+        """Stop monitoring a process.
+
+        The uuid+service_name will be removed from the monitored processes.
+
+        The service must be disabled **after** unregistering, otherwise if
+        process monitor checks after you disable the process, and before
+        you unregister it, the process will be respawned, and left orphaned
+        into the system.
+
+        :param uuid: An ID of the resource for which the process is running.
+        :param service_name: A logical service name for this process monitor,
+                             so the same uuid provided via process manager
+                             can reference several different services.
+        """
+
+        service_id = ServiceId(uuid, service_name)
+        self._monitored_processes.pop(service_id, None)
+
+    def stop(self):
+        """Stop the process monitoring. """
+        self._monitor_processes = False
 
     def _spawn_checking_thread(self):
+        self._monitor_processes = True
         eventlet.spawn(self._periodic_checking_thread)
 
     @lockutils.synchronized("_check_child_processes")
@@ -259,8 +211,8 @@ class ProcessMonitor(object):
         # we build the list of keys before iterating in the loop to cover
         # the case where other threads add or remove items from the
         # dictionary which otherwise will cause a RuntimeError
-        for service_id in list(self._process_managers):
-            pm = self._process_managers.get(service_id)
+        for service_id in list(self._monitored_processes):
+            pm = self._monitored_processes.get(service_id)
 
             if pm and not pm.active:
                 LOG.error(_LE("%(service)s for %(resource_type)s "
@@ -273,7 +225,7 @@ class ProcessMonitor(object):
             eventlet.sleep(0)
 
     def _periodic_checking_thread(self):
-        while True:
+        while self._monitor_processes:
             eventlet.sleep(self._config.AGENT.check_child_processes_interval)
             eventlet.spawn(self._check_child_processes)
 
@@ -286,7 +238,7 @@ class ProcessMonitor(object):
         LOG.error(_LE("respawning %(service)s for uuid %(uuid)s"),
                   {'service': service_id.service,
                    'uuid': service_id.uuid})
-        self._process_managers[service_id].enable()
+        self._monitored_processes[service_id].enable()
 
     def _exit_action(self, service_id):
         LOG.error(_LE("Exiting agent as programmed in check_child_processes_"
index 1c757682f5815a94536b037cf525abae29d0d5c9..ef8346e4588afd90cf97e425b0750af8a77554b1 100644 (file)
@@ -18,6 +18,7 @@ import netaddr
 from oslo_config import cfg
 import six
 
+from neutron.agent.linux import external_process
 from neutron.agent.linux import utils
 from neutron.common import constants
 from neutron.openstack.common import log as logging
@@ -90,6 +91,14 @@ class DaemonMonitor(object):
         utils.replace_file(radvd_conf, buf.getvalue())
         return radvd_conf
 
+    def _get_radvd_process_manager(self, callback=None):
+        return external_process.ProcessManager(
+            uuid=self._router_id,
+            default_cmd_callback=callback,
+            namespace=self._router_ns,
+            service=RADVD_SERVICE_NAME,
+            conf=cfg.CONF)
+
     def _spawn_radvd(self, radvd_conf):
         def callback(pid_file):
             # we need to use -m syslog and f.e. not -m stderr (the default)
@@ -103,11 +112,11 @@ class DaemonMonitor(object):
                          '-m', 'syslog']
             return radvd_cmd
 
-        self._process_monitor.enable(uuid=self._router_id,
-                                     cmd_callback=callback,
-                                     namespace=self._router_ns,
-                                     service=RADVD_SERVICE_NAME,
-                                     reload_cfg=True)
+        pm = self._get_radvd_process_manager(callback)
+        pm.enable(reload_cfg=True)
+        self._process_monitor.register(uuid=self._router_id,
+                                       service_name=RADVD_SERVICE_NAME,
+                                       monitored_process=pm)
         LOG.debug("radvd enabled for router %s", self._router_id)
 
     def enable(self, router_ports):
@@ -124,12 +133,13 @@ class DaemonMonitor(object):
         self._spawn_radvd(radvd_conf)
 
     def disable(self):
-        self._process_monitor.disable(self._router_id,
-                                      service=RADVD_SERVICE_NAME)
+        self._process_monitor.unregister(uuid=self._router_id,
+                                         service_name=RADVD_SERVICE_NAME)
+        pm = self._get_radvd_process_manager()
+        pm.disable()
         utils.remove_conf_files(cfg.CONF.ra_confs, self._router_id)
         LOG.debug("radvd disabled for router %s", self._router_id)
 
     @property
     def enabled(self):
-        return self._process_monitor.is_active(self._router_id,
-                                               RADVD_SERVICE_NAME)
+        return self._get_radvd_process_manager().active
index c908e5581dc377243f170c5a1fe686f336ad9925..5f64b1955cfc4b083e84a5c318839e454e61c38b 100644 (file)
@@ -27,6 +27,7 @@ LOG = logging.getLogger(__name__)
 
 # Access with redirection to metadata proxy iptables mark mask
 METADATA_ACCESS_MARK_MASK = '0xffffffff'
+METADATA_SERVICE_NAME = 'metadata-proxy'
 
 
 class MetadataDriver(advanced_service.AdvancedService):
@@ -83,7 +84,8 @@ class MetadataDriver(advanced_service.AdvancedService):
 
         self.destroy_monitored_metadata_proxy(self.l3_agent.process_monitor,
                                               router.router['id'],
-                                              router.ns_name)
+                                              router.ns_name,
+                                              self.l3_agent.conf)
 
     @classmethod
     def metadata_filter_rules(cls, port, mark):
@@ -143,20 +145,25 @@ class MetadataDriver(advanced_service.AdvancedService):
     @classmethod
     def spawn_monitored_metadata_proxy(cls, monitor, ns_name, port, conf,
                                        network_id=None, router_id=None):
+        uuid = network_id or router_id
         callback = cls._get_metadata_proxy_callback(
             port, conf, network_id=network_id, router_id=router_id)
-        monitor.enable(network_id or router_id, callback, ns_name)
+        pm = cls._get_metadata_proxy_process_manager(uuid, ns_name, conf,
+                                                     callback=callback)
+        pm.enable()
+        monitor.register(uuid, METADATA_SERVICE_NAME, pm)
 
     @classmethod
-    def destroy_monitored_metadata_proxy(cls, monitor, uuid, ns_name):
-        monitor.disable(uuid, ns_name)
+    def destroy_monitored_metadata_proxy(cls, monitor, uuid, ns_name, conf):
+        monitor.unregister(uuid, METADATA_SERVICE_NAME)
+        pm = cls._get_metadata_proxy_process_manager(uuid, ns_name, conf)
+        pm.disable()
 
-    # TODO(mangelajo): remove the unmonitored _get_*_process_manager,
-    #                  _spawn_* and _destroy* when keepalived stops
-    #                  spawning and killing proxies on its own.
     @classmethod
-    def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf):
+    def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf,
+                                            callback=None):
         return external_process.ProcessManager(
-            conf,
-            router_id,
-            ns_name)
+            conf=conf,
+            uuid=router_id,
+            namespace=ns_name,
+            default_cmd_callback=callback)
index 150046b4e2f9a898a5fb9daec6e9eaac753d95cf..e579693b9d338687a3706f6e6c04d819be414425 100644 (file)
@@ -23,7 +23,6 @@ from neutron.agent.common import config as agent_config
 from neutron.agent.dhcp import config as dhcp_config
 from neutron.agent.l3 import agent as l3_agent
 from neutron.agent.linux import dhcp
-from neutron.agent.linux import external_process
 from neutron.agent.linux import interface
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import ovs_lib
@@ -70,12 +69,6 @@ def setup_conf():
     return conf
 
 
-def _get_dhcp_process_monitor(config):
-    return external_process.ProcessMonitor(
-        config=config,
-        resource_type='dhcp')
-
-
 def kill_dhcp(conf, namespace):
     """Disable DHCP for a network if DHCP is still active."""
     network_id = namespace.replace(dhcp.NS_PREFIX, '')
@@ -83,7 +76,6 @@ def kill_dhcp(conf, namespace):
     dhcp_driver = importutils.import_object(
         conf.dhcp_driver,
         conf=conf,
-        process_monitor=_get_dhcp_process_monitor(conf),
         network=dhcp.NetModel(conf.use_namespaces, {'id': network_id}),
         plugin=FakeDhcpPlugin())
 
index 961cf92349b0e90f0c86a04e4c37472ac1c4ce90..aca151dd6f7707c21fad0cb469c4778925fcb1d4 100644 (file)
@@ -22,6 +22,7 @@ from neutron.tests.functional.agent.linux import simple_daemon
 
 
 UUID_FORMAT = "test-uuid-%d"
+SERVICE_NAME = "service"
 
 
 class BaseTestProcessMonitor(base.BaseTestCase):
@@ -30,13 +31,13 @@ class BaseTestProcessMonitor(base.BaseTestCase):
         super(BaseTestProcessMonitor, self).setUp()
         cfg.CONF.set_override('check_child_processes_interval', 1, 'AGENT')
         self._child_processes = []
-        self._ext_processes = None
+        self._process_monitor = None
         self.create_child_processes_manager('respawn')
         self.addCleanup(self.cleanup_spawned_children)
 
     def create_child_processes_manager(self, action):
         cfg.CONF.set_override('check_child_processes_action', action, 'AGENT')
-        self._ext_processes = self.build_process_monitor()
+        self._process_monitor = self.build_process_monitor()
 
     def build_process_monitor(self):
         return external_process.ProcessMonitor(
@@ -56,11 +57,14 @@ class BaseTestProcessMonitor(base.BaseTestCase):
         for child_number in moves.xrange(n):
             uuid = self._child_uuid(child_number)
             _callback = self._make_cmdline_callback(uuid)
-            self._ext_processes.enable(uuid=uuid,
-                                       cmd_callback=_callback,
-                                       service=service)
+            pm = external_process.ProcessManager(
+                conf=cfg.CONF,
+                uuid=uuid,
+                default_cmd_callback=_callback,
+                service=service)
+            pm.enable()
+            self._process_monitor.register(uuid, SERVICE_NAME, pm)
 
-            pm = self._ext_processes.get_process_manager(uuid, service)
             self._child_processes.append(pm)
 
     @staticmethod
@@ -70,16 +74,11 @@ class BaseTestProcessMonitor(base.BaseTestCase):
     def _kill_last_child(self):
         self._child_processes[-1].disable()
 
-    def spawn_child_processes_and_kill_last(self, service=None, number=2):
-        self.spawn_n_children(number, service)
-        self._kill_last_child()
-        self.assertFalse(self._child_processes[-1].active)
-
-    def wait_for_all_childs_respawned(self):
-        def all_childs_active():
+    def wait_for_all_children_respawned(self):
+        def all_children_active():
             return all(pm.active for pm in self._child_processes)
 
-        self._wait_for_condition(all_childs_active)
+        self._wait_for_condition(all_children_active)
 
     def _wait_for_condition(self, exit_condition, extra_time=5):
         # we need to allow extra_time for the check process to happen
@@ -92,24 +91,13 @@ class BaseTestProcessMonitor(base.BaseTestCase):
                 eventlet.sleep(0.01)
 
     def cleanup_spawned_children(self):
-        if self._ext_processes:
-            self._ext_processes.disable_all()
+        for pm in self._child_processes:
+            pm.disable()
 
 
 class TestProcessMonitor(BaseTestProcessMonitor):
 
     def test_respawn_handler(self):
-        self.spawn_child_processes_and_kill_last()
-        self.wait_for_all_childs_respawned()
-
-    def test_new_process_monitor_finds_old_process(self):
-        self.spawn_n_children(1)
-        spawn_process = self._child_processes[-1]
-        uuid = spawn_process.uuid
-
-        another_pm = self.build_process_monitor()
-        self.assertTrue(another_pm.is_active(uuid))
-        self.assertEqual(another_pm.get_pid(uuid), spawn_process.pid)
-
-    def test_tries_to_get_pid_for_unknown_uuid(self):
-        self.assertIsNone(self._ext_processes.get_pid('bad-uuid'))
+        self.spawn_n_children(2)
+        self._kill_last_child()
+        self.wait_for_all_children_respawned()
index 0e65144cc21c3a71059eb2738983a1e9b3561125..fe4093892f3e268be8ef611e37b4e67ccf349a21 100644 (file)
@@ -19,7 +19,7 @@ from neutron.agent.linux import external_process
 from neutron.tests import base
 
 TEST_UUID = 'test-uuid'
-TEST_SERVICE1 = 'testsvc'
+TEST_SERVICE = 'testsvc'
 TEST_PID = 1234
 
 
@@ -27,10 +27,6 @@ class BaseTestProcessMonitor(base.BaseTestCase):
 
     def setUp(self):
         super(BaseTestProcessMonitor, self).setUp()
-        self.pm_patch = mock.patch("neutron.agent.linux.external_process."
-                                   "ProcessManager", side_effect=mock.Mock)
-        self.pmanager = self.pm_patch.start()
-
         self.log_patch = mock.patch("neutron.agent.linux.external_process."
                                     "LOG.error")
         self.error_log = self.log_patch.start()
@@ -49,47 +45,51 @@ class BaseTestProcessMonitor(base.BaseTestCase):
             config=conf,
             resource_type='test')
 
-    def get_monitored_process_manager(self, uuid, service=None):
-        self.pmonitor.enable(uuid=uuid, service=service, cmd_callback=None)
-        return self.pmonitor.get_process_manager(uuid, service)
+    def get_monitored_process(self, uuid, service=None):
+        monitored_process = mock.Mock()
+        self.pmonitor.register(uuid=uuid,
+                               service_name=service,
+                               monitored_process=monitored_process)
+        return monitored_process
 
 
 class TestProcessMonitor(BaseTestProcessMonitor):
 
     def test_error_logged(self):
-        pm = self.get_monitored_process_manager(TEST_UUID)
+        pm = self.get_monitored_process(TEST_UUID)
         pm.active = False
         self.pmonitor._check_child_processes()
         self.assertTrue(self.error_log.called)
 
     def test_exit_handler(self):
         self.create_child_process_monitor('exit')
-        pm = self.get_monitored_process_manager(TEST_UUID)
+        pm = self.get_monitored_process(TEST_UUID)
         pm.active = False
         with mock.patch.object(external_process.ProcessMonitor,
                                '_exit_handler') as exit_handler:
             self.pmonitor._check_child_processes()
             exit_handler.assert_called_once_with(TEST_UUID, None)
 
-    def test_different_service_types(self):
-        pm_none = self.get_monitored_process_manager(TEST_UUID)
-        pm_svc1 = self.get_monitored_process_manager(TEST_UUID, TEST_SERVICE1)
-        self.assertNotEqual(pm_none, pm_svc1)
-
-    def test_active_method(self, service=None):
-        pm = self.get_monitored_process_manager(TEST_UUID, service)
-        pm.active = False
-        self.assertFalse(self.pmonitor.is_active(TEST_UUID, service))
-        pm.active = True
-        self.assertTrue(self.pmonitor.is_active(TEST_UUID, service))
-
-    def test_active_method_with_service(self):
-        self.test_active_method(TEST_SERVICE1)
-
-    def test_pid_method(self, service=None):
-        pm = self.get_monitored_process_manager(TEST_UUID, service)
-        pm.pid = TEST_PID
-        self.assertEqual(TEST_PID, self.pmonitor.get_pid(TEST_UUID, service))
-
-    def test_pid_method_with_service(self):
-        self.test_pid_method(TEST_PID)
+    def test_register(self):
+        pm = self.get_monitored_process(TEST_UUID)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 1)
+        self.assertIn(pm, self.pmonitor._monitored_processes.values())
+
+    def test_register_same_service_twice(self):
+        self.get_monitored_process(TEST_UUID)
+        self.get_monitored_process(TEST_UUID)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 1)
+
+    def test_register_different_service_types(self):
+        self.get_monitored_process(TEST_UUID)
+        self.get_monitored_process(TEST_UUID, TEST_SERVICE)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 2)
+
+    def test_unregister(self):
+        self.get_monitored_process(TEST_UUID)
+        self.pmonitor.unregister(TEST_UUID, None)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 0)
+
+    def test_unregister_unknown_process(self):
+        self.pmonitor.unregister(TEST_UUID, None)
+        self.assertEqual(len(self.pmonitor._monitored_processes), 0)
index 793713b19cf6ac9333001dba13ba18eac9909c09..2be9afe0049efd33f3493f497a0e89c8d95a4fbb 100644 (file)
@@ -557,15 +557,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
 
     def _process_manager_constructor_call(self):
         return mock.call(conf=cfg.CONF,
-                        uuid=FAKE_NETWORK_UUID,
-                        namespace=FAKE_NETWORK_DHCP_NS,
-                        service=None,
-                        default_cmd_callback=mock.ANY,
-                        pid_file=None,
-                        cmd_addl_env=None)
+                         uuid=FAKE_NETWORK_UUID,
+                         namespace=FAKE_NETWORK_DHCP_NS,
+                         default_cmd_callback=mock.ANY)
 
     def _enable_dhcp_helper(self, network, enable_isolated_metadata=False,
                             is_isolated_network=False):
+        self.dhcp._process_monitor = mock.Mock()
         if enable_isolated_metadata:
             cfg.CONF.set_override('enable_isolated_metadata', True)
         self.plugin.get_network_info.return_value = network
@@ -577,7 +575,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
         if is_isolated_network:
             self.external_process.assert_has_calls([
                 self._process_manager_constructor_call(),
-                mock.call().enable(reload_cfg=False)
+                mock.call().enable()
             ])
         else:
             self.assertFalse(self.external_process.call_count)
@@ -744,10 +742,11 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
         self._disable_dhcp_helper_driver_failure()
 
     def test_enable_isolated_metadata_proxy(self):
+        self.dhcp._process_monitor = mock.Mock()
         self.dhcp.enable_isolated_metadata_proxy(fake_network)
         self.external_process.assert_has_calls([
             self._process_manager_constructor_call(),
-            mock.call().enable(reload_cfg=False)
+            mock.call().enable()
         ])
 
     def test_disable_isolated_metadata_proxy(self):
@@ -757,7 +756,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
             self.dhcp.disable_isolated_metadata_proxy(fake_network)
             destroy.assert_called_once_with(self.dhcp._process_monitor,
                                             fake_network.id,
-                                            fake_network.namespace)
+                                            fake_network.namespace,
+                                            cfg.CONF)
 
     def _test_metadata_network(self, network):
         cfg.CONF.set_override('enable_metadata_network', True)
index 968e70211ce470cc32af37c459b689f41fc16abb..07dc9effbe743a9ab4eeca26c2606a6354710bb5 100644 (file)
@@ -214,6 +214,8 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
         self.external_process_p = mock.patch(
             'neutron.agent.linux.external_process.ProcessManager')
         self.external_process = self.external_process_p.start()
+        self.process_monitor = mock.patch(
+            'neutron.agent.linux.external_process.ProcessMonitor').start()
 
         self.send_arp_p = mock.patch(
             'neutron.agent.linux.ip_lib.send_gratuitous_arp')
@@ -1070,32 +1072,24 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
         self.assertFalse(nat_rules_delta)
         return ri
 
-    def _expected_call_lookup_ri_process_enabled(self, ri, process):
+    def _expected_call_lookup_ri_process(self, ri, process):
         """Expected call if a process is looked up in a router instance."""
         return [mock.call(uuid=ri.router['id'],
                           service=process,
                           default_cmd_callback=mock.ANY,
                           namespace=ri.ns_name,
-                          conf=self.conf,
-                          pid_file=None,
-                          cmd_addl_env=None)]
-
-    def _expected_call_lookup_ri_process_disabled(self, ri, process):
-        """Expected call if a process is looked up in a router instance."""
-        # The ProcessManager does already exist, and it's found via
-        # ProcessMonitor lookup _ensure_process_manager
-        return [mock.call().__nonzero__()]
+                          conf=mock.ANY)]
 
     def _assert_ri_process_enabled(self, ri, process):
         """Verify that process was enabled for a router instance."""
-        expected_calls = self._expected_call_lookup_ri_process_enabled(
+        expected_calls = self._expected_call_lookup_ri_process(
             ri, process)
         expected_calls.append(mock.call().enable(reload_cfg=True))
         self.assertEqual(expected_calls, self.external_process.mock_calls)
 
     def _assert_ri_process_disabled(self, ri, process):
         """Verify that process was disabled for a router instance."""
-        expected_calls = self._expected_call_lookup_ri_process_disabled(
+        expected_calls = self._expected_call_lookup_ri_process(
             ri, process)
         expected_calls.append(mock.call().disable())
         self.assertEqual(expected_calls, self.external_process.mock_calls)
@@ -1158,6 +1152,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
         self._assert_ri_process_enabled(ri, 'radvd')
         # Reset the calls so we can check for disable radvd
         self.external_process.reset_mock()
+        self.process_monitor.reset_mock()
         # Remove the IPv6 interface and reprocess
         del router[l3_constants.INTERFACE_KEY][1]
         self._process_router_instance_for_agent(agent, ri, router)
@@ -1485,6 +1480,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
                 if enableflag:
                     destroy_proxy.assert_called_with(mock.ANY,
                                                      router_id,
+                                                     mock.ANY,
                                                      mock.ANY)
                 else:
                     self.assertFalse(destroy_proxy.call_count)
index e51285dfbda809df0d951a5d4842df6e72253951..4d0b1eb75214481e33bb45b4baf1bb7bbd3536f7 100644 (file)
@@ -623,6 +623,9 @@ class TestBase(base.BaseTestCase):
         self.isdir = mock.patch('os.path.isdir').start()
         self.isdir.return_value = False
 
+        self.external_process = mock.patch(
+            'neutron.agent.linux.external_process.ProcessManager').start()
+
 
 class TestDhcpBase(TestBase):
 
@@ -711,6 +714,10 @@ class TestDhcpLocalProcess(TestBase):
             self.assertTrue(mocks['interface_name'].__set__.called)
             self.assertTrue(mocks['_ensure_network_conf_dir'].called)
 
+    def _assert_disabled(self, lp):
+        self.assertTrue(lp.process_monitor.unregister.called)
+        self.assertTrue(self.external_process().disable.called)
+
     def test_disable_not_active(self):
         attrs_to_mock = dict([(a, mock.DEFAULT) for a in
                               ['active', 'interface_name']])
@@ -719,11 +726,11 @@ class TestDhcpLocalProcess(TestBase):
             mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
             network = FakeDualNetwork()
             lp = LocalChild(self.conf, network)
-            lp.process_monitor.pid.return_value = 5
             lp.device_manager = mock.Mock()
             lp.disable()
             lp.device_manager.destroy.assert_called_once_with(
                 network, 'tap0')
+            self._assert_disabled(lp)
 
     def test_disable_retain_port(self):
         attrs_to_mock = dict([(a, mock.DEFAULT) for a in
@@ -734,7 +741,7 @@ class TestDhcpLocalProcess(TestBase):
             mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
             lp = LocalChild(self.conf, network)
             lp.disable(retain_port=True)
-            self.assertTrue(lp.process_monitor.disable.called)
+            self._assert_disabled(lp)
 
     def test_disable(self):
         attrs_to_mock = dict([(a, mock.DEFAULT) for a in
@@ -745,9 +752,10 @@ class TestDhcpLocalProcess(TestBase):
             mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
             lp = LocalChild(self.conf, network)
             with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip:
-                lp.process_monitor.pid.return_value = 5
                 lp.disable()
 
+            self._assert_disabled(lp)
+
         self.mock_mgr.assert_has_calls([mock.call(self.conf, None),
                                         mock.call().destroy(network, 'tap0')])
 
@@ -761,9 +769,10 @@ class TestDhcpLocalProcess(TestBase):
             mocks['active'].__get__ = mock.Mock(return_value=False)
             lp = LocalChild(self.conf, FakeDualNetwork())
             with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip:
-                lp.process_monitor.pid.return_value = 5
                 lp.disable()
 
+            self._assert_disabled(lp)
+
         ip.return_value.netns.delete.assert_called_with('qdhcp-ns')
 
     def test_get_interface_name(self):
@@ -865,15 +874,11 @@ class TestDnsmasq(TestBase):
             dm.spawn_process()
             self.assertTrue(mocks['_output_opts_file'].called)
 
-            test_pm.enable.assert_called_once_with(
-                uuid=network.id,
-                service='dnsmasq',
-                namespace='qdhcp-ns',
-                cmd_callback=mock.ANY,
-                reload_cfg=False,
-                pid_file=expected_pid_file)
-            call_kwargs = test_pm.method_calls[0][2]
-            cmd_callback = call_kwargs['cmd_callback']
+            self.assertTrue(test_pm.register.called)
+            self.external_process().enable.assert_called_once_with(
+                reload_cfg=False)
+            call_kwargs = self.external_process.mock_calls[0][2]
+            cmd_callback = call_kwargs['default_cmd_callback']
 
             result_cmd = cmd_callback(expected_pid_file)
 
@@ -1261,12 +1266,9 @@ class TestDnsmasq(TestBase):
             test_pm = mock.Mock()
             dm = self._get_dnsmasq(FakeDualNetwork(), test_pm)
             dm.reload_allocations()
-            test_pm.enable.assert_has_calls([mock.call(uuid=mock.ANY,
-                                             cmd_callback=mock.ANY,
-                                             namespace=mock.ANY,
-                                             service=mock.ANY,
-                                             reload_cfg=True,
-                                             pid_file=mock.ANY)])
+            self.assertTrue(test_pm.register.called)
+            self.external_process().enable.assert_called_once_with(
+                reload_cfg=True)
 
             self.safe.assert_has_calls([
                 mock.call(exp_host_name, exp_host_data),
index 244e4a978d29f1b31cfccc7de5e703470464be5d..7d17c7ae2555c528eaaf713e94e278ccb563a824 100644 (file)
@@ -45,8 +45,7 @@ class TestNetnsCleanup(base.BaseTestCase):
             util.kill_dhcp(conf, 'ns')
 
             expected_params = {'conf': conf, 'network': mock.ANY,
-                               'plugin': mock.ANY,
-                               'process_monitor': mock.ANY}
+                               'plugin': mock.ANY}
             import_object.assert_called_once_with('driver', **expected_params)
 
             if dhcp_active: