def disable_isolated_metadata_proxy(self, network):
metadata_driver.MetadataDriver.destroy_monitored_metadata_proxy(
- self._process_monitor, network.id, network.namespace)
+ self._process_monitor, network.id, network.namespace, self.conf)
class DhcpPluginApi(object):
from oslo_utils import importutils
import six
+from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import ipv6_utils
from neutron.common import utils as commonutils
-from neutron.i18n import _LE, _LI
+from neutron.i18n import _LE, _LI, _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
self.interface_name = interface_name
self.spawn_process()
+ def _get_process_manager(self, cmd_callback=None):
+ return external_process.ProcessManager(
+ conf=self.conf,
+ uuid=self.network.id,
+ namespace=self.network.namespace,
+ default_cmd_callback=cmd_callback,
+ pid_file=self.get_conf_file_name('pid'))
+
def disable(self, retain_port=False):
"""Disable DHCP for this network by killing the local process."""
- pid_filename = self.get_conf_file_name('pid')
-
- pid = self.process_monitor.get_pid(uuid=self.network.id,
- service=DNSMASQ_SERVICE_NAME,
- pid_file=pid_filename)
-
- self.process_monitor.disable(uuid=self.network.id,
- namespace=self.network.namespace,
- service=DNSMASQ_SERVICE_NAME,
- pid_file=pid_filename)
- if pid and not retain_port:
- self.device_manager.destroy(self.network, self.interface_name)
+ self.process_monitor.unregister(self.network.id, DNSMASQ_SERVICE_NAME)
+ self._get_process_manager().disable()
self._remove_config_files()
-
if not retain_port:
- if self.conf.dhcp_delete_namespaces and self.network.namespace:
- ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
- try:
- ns_ip.netns.delete(self.network.namespace)
- except RuntimeError:
- LOG.exception(_LE('Failed trying to delete namespace: %s'),
- self.network.namespace)
+ self._destroy_namespace_and_port()
+
+ def _destroy_namespace_and_port(self):
+ try:
+ self.device_manager.destroy(self.network, self.interface_name)
+ except RuntimeError:
+ LOG.warning(_LW('Failed trying to delete interface: %s'),
+ self.interface_name)
+
+ if self.conf.dhcp_delete_namespaces and self.network.namespace:
+ ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
+ try:
+ ns_ip.netns.delete(self.network.namespace)
+ except RuntimeError:
+ LOG.warning(_LW('Failed trying to delete namespace: %s'),
+ self.network.namespace)
def _get_value_from_conf_file(self, kind, converter=None):
"""A helper function to read a value from one of the state files."""
@property
def active(self):
- pid_filename = self.get_conf_file_name('pid')
- return self.process_monitor.is_active(self.network.id,
- DNSMASQ_SERVICE_NAME,
- pid_file=pid_filename)
+ return self._get_process_manager().active
@abc.abstractmethod
def spawn_process(self):
self._output_config_files()
- pid_filename = self.get_conf_file_name('pid')
+ pm = self._get_process_manager(
+ cmd_callback=self._build_cmdline_callback)
- self.process_monitor.enable(
- uuid=self.network.id,
- cmd_callback=self._build_cmdline_callback,
- namespace=self.network.namespace,
- service=DNSMASQ_SERVICE_NAME,
- reload_cfg=reload_with_HUP,
- pid_file=pid_filename)
+ pm.enable(reload_cfg=reload_with_HUP)
+
+ self.process_monitor.register(uuid=self.network.id,
+ service_name=DNSMASQ_SERVICE_NAME,
+ monitored_process=pm)
def _release_lease(self, mac_address, ip):
"""Release a DHCP lease."""
# License for the specific language governing permissions and limitations
# under the License.
+import abc
import collections
import os.path
+import six
import eventlet
from oslo_concurrency import lockutils
agent_cfg.register_process_monitor_opts(cfg.CONF)
-class ProcessManager(object):
+@six.add_metaclass(abc.ABCMeta)
+class MonitoredProcess(object):
+ @abc.abstractproperty
+ def active(self):
+ """Boolean representing the running state of the process."""
+
+ @abc.abstractmethod
+ def enable(self):
+ """Enable the service, or respawn the process."""
+
+
+class ProcessManager(MonitoredProcess):
"""An external process manager for Neutron spawned processes.
Note: The manager expects uuid to be in cmdline.
self._config = config
self._resource_type = resource_type
- self._process_managers = {}
+ self._monitored_processes = {}
if self._config.AGENT.check_child_processes_interval:
self._spawn_checking_thread()
- def enable(self, uuid, cmd_callback, namespace=None, service=None,
- reload_cfg=False, cmd_addl_env=None, pid_file=None):
- """Creates a process manager and ensures that it is monitored.
-
- It will create a new ProcessManager and tie it to the uuid/service
- with the new settings, replacing the old one if it existed already.
-
- :param uuid: UUID of the resource this process will serve for.
- :param cmd_callback: Callback function that receives a pid_file
- location and returns a list with the command
- and arguments.
- :param namespace: network namespace to run the process in, if
- necessary.
- :param service: a logical name for the service this process provides,
- it will extend the pid file like pid.%(service)s.
- :param reload_cfg: if the process is active send a HUP signal
- for configuration reload, otherwise spawn it
- normally.
- :param cmd_addl_env: additional environment variables for the
- spawned process.
- :param pid_file: the pid file to store the pid of the external
- process. If not provided, a default will be used.
+ def register(self, uuid, service_name, monitored_process):
+ """Start monitoring a process.
+
+ The given monitored_process will be tied to it's uuid+service_name
+ replacing the old one if it existed already.
+
+ The monitored_process should be enabled before registration,
+ otherwise ProcessMonitor could try to enable the process itself,
+ which could lead to double enable and if unlucky enough, two processes
+ running, and also errors in the logs.
+
+ :param uuid: An ID of the resource for which the process is running.
+ :param service_name: A logical service name for this process monitor,
+ so the same uuid provided via process manager
+ can reference several different services.
+ :param monitored_process: MonitoredProcess we want to monitor.
"""
- process_manager = self._create_process_manager(
- uuid=uuid,
- cmd_callback=cmd_callback,
- namespace=namespace,
- service=service,
- cmd_addl_env=cmd_addl_env,
- pid_file=pid_file)
-
- process_manager.enable(reload_cfg=reload_cfg)
- service_id = ServiceId(uuid, service)
-
- # replace the old process manager with the new one
- self._process_managers[service_id] = process_manager
-
- def disable(self, uuid, namespace=None, service=None,
- pid_file=None):
- """Disables the process and stops monitoring it."""
- service_id = ServiceId(uuid, service)
-
- process_manager = self._ensure_process_manager(
- uuid=uuid,
- service=service,
- pid_file=pid_file,
- namespace=namespace)
- self._process_managers.pop(service_id, None)
-
- process_manager.disable()
-
- def disable_all(self):
- for service_id in self._process_managers.keys():
- self.disable(uuid=service_id.uuid, service=service_id.service)
-
- def get_process_manager(self, uuid, service=None):
- """Returns a process manager for manipulation"""
- service_id = ServiceId(uuid, service)
- return self._process_managers.get(service_id)
-
- def is_active(self, uuid, service=None, pid_file=None):
- return self._ensure_process_manager(
- uuid=uuid,
- service=service,
- pid_file=pid_file).active
-
- def get_pid(self, uuid, service=None, pid_file=None):
- return self._ensure_process_manager(
- uuid=uuid,
- service=service,
- pid_file=pid_file).pid
-
- def get_pid_file_name(self, uuid, service=None):
- return self._ensure_process_manager(
- uuid=uuid,
- service=service).get_pid_file_name()
-
- def _ensure_process_manager(self, uuid, cmd_callback=None,
- namespace=None, service=None,
- cmd_addl_env=None,
- pid_file=None,
- ):
-
- process_manager = self.get_process_manager(uuid, service)
- if not process_manager:
- # if the process existed in a different run of the agent
- # provide one, generally for pid / active evaluation
- process_manager = self._create_process_manager(
- uuid=uuid,
- cmd_callback=cmd_callback,
- namespace=namespace,
- service=service,
- cmd_addl_env=cmd_addl_env,
- pid_file=pid_file)
- return process_manager
-
- def _create_process_manager(self, uuid, cmd_callback, namespace, service,
- cmd_addl_env, pid_file):
- return ProcessManager(conf=self._config,
- uuid=uuid,
- namespace=namespace,
- service=service,
- default_cmd_callback=cmd_callback,
- cmd_addl_env=cmd_addl_env,
- pid_file=pid_file)
+
+ service_id = ServiceId(uuid, service_name)
+ self._monitored_processes[service_id] = monitored_process
+
+ def unregister(self, uuid, service_name):
+ """Stop monitoring a process.
+
+ The uuid+service_name will be removed from the monitored processes.
+
+ The service must be disabled **after** unregistering, otherwise if
+ process monitor checks after you disable the process, and before
+ you unregister it, the process will be respawned, and left orphaned
+ into the system.
+
+ :param uuid: An ID of the resource for which the process is running.
+ :param service_name: A logical service name for this process monitor,
+ so the same uuid provided via process manager
+ can reference several different services.
+ """
+
+ service_id = ServiceId(uuid, service_name)
+ self._monitored_processes.pop(service_id, None)
+
+ def stop(self):
+ """Stop the process monitoring. """
+ self._monitor_processes = False
def _spawn_checking_thread(self):
+ self._monitor_processes = True
eventlet.spawn(self._periodic_checking_thread)
@lockutils.synchronized("_check_child_processes")
# we build the list of keys before iterating in the loop to cover
# the case where other threads add or remove items from the
# dictionary which otherwise will cause a RuntimeError
- for service_id in list(self._process_managers):
- pm = self._process_managers.get(service_id)
+ for service_id in list(self._monitored_processes):
+ pm = self._monitored_processes.get(service_id)
if pm and not pm.active:
LOG.error(_LE("%(service)s for %(resource_type)s "
eventlet.sleep(0)
def _periodic_checking_thread(self):
- while True:
+ while self._monitor_processes:
eventlet.sleep(self._config.AGENT.check_child_processes_interval)
eventlet.spawn(self._check_child_processes)
LOG.error(_LE("respawning %(service)s for uuid %(uuid)s"),
{'service': service_id.service,
'uuid': service_id.uuid})
- self._process_managers[service_id].enable()
+ self._monitored_processes[service_id].enable()
def _exit_action(self, service_id):
LOG.error(_LE("Exiting agent as programmed in check_child_processes_"
from oslo_config import cfg
import six
+from neutron.agent.linux import external_process
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.openstack.common import log as logging
utils.replace_file(radvd_conf, buf.getvalue())
return radvd_conf
+ def _get_radvd_process_manager(self, callback=None):
+ return external_process.ProcessManager(
+ uuid=self._router_id,
+ default_cmd_callback=callback,
+ namespace=self._router_ns,
+ service=RADVD_SERVICE_NAME,
+ conf=cfg.CONF)
+
def _spawn_radvd(self, radvd_conf):
def callback(pid_file):
# we need to use -m syslog and f.e. not -m stderr (the default)
'-m', 'syslog']
return radvd_cmd
- self._process_monitor.enable(uuid=self._router_id,
- cmd_callback=callback,
- namespace=self._router_ns,
- service=RADVD_SERVICE_NAME,
- reload_cfg=True)
+ pm = self._get_radvd_process_manager(callback)
+ pm.enable(reload_cfg=True)
+ self._process_monitor.register(uuid=self._router_id,
+ service_name=RADVD_SERVICE_NAME,
+ monitored_process=pm)
LOG.debug("radvd enabled for router %s", self._router_id)
def enable(self, router_ports):
self._spawn_radvd(radvd_conf)
def disable(self):
- self._process_monitor.disable(self._router_id,
- service=RADVD_SERVICE_NAME)
+ self._process_monitor.unregister(uuid=self._router_id,
+ service_name=RADVD_SERVICE_NAME)
+ pm = self._get_radvd_process_manager()
+ pm.disable()
utils.remove_conf_files(cfg.CONF.ra_confs, self._router_id)
LOG.debug("radvd disabled for router %s", self._router_id)
@property
def enabled(self):
- return self._process_monitor.is_active(self._router_id,
- RADVD_SERVICE_NAME)
+ return self._get_radvd_process_manager().active
# Access with redirection to metadata proxy iptables mark mask
METADATA_ACCESS_MARK_MASK = '0xffffffff'
+METADATA_SERVICE_NAME = 'metadata-proxy'
class MetadataDriver(advanced_service.AdvancedService):
self.destroy_monitored_metadata_proxy(self.l3_agent.process_monitor,
router.router['id'],
- router.ns_name)
+ router.ns_name,
+ self.l3_agent.conf)
@classmethod
def metadata_filter_rules(cls, port, mark):
@classmethod
def spawn_monitored_metadata_proxy(cls, monitor, ns_name, port, conf,
network_id=None, router_id=None):
+ uuid = network_id or router_id
callback = cls._get_metadata_proxy_callback(
port, conf, network_id=network_id, router_id=router_id)
- monitor.enable(network_id or router_id, callback, ns_name)
+ pm = cls._get_metadata_proxy_process_manager(uuid, ns_name, conf,
+ callback=callback)
+ pm.enable()
+ monitor.register(uuid, METADATA_SERVICE_NAME, pm)
@classmethod
- def destroy_monitored_metadata_proxy(cls, monitor, uuid, ns_name):
- monitor.disable(uuid, ns_name)
+ def destroy_monitored_metadata_proxy(cls, monitor, uuid, ns_name, conf):
+ monitor.unregister(uuid, METADATA_SERVICE_NAME)
+ pm = cls._get_metadata_proxy_process_manager(uuid, ns_name, conf)
+ pm.disable()
- # TODO(mangelajo): remove the unmonitored _get_*_process_manager,
- # _spawn_* and _destroy* when keepalived stops
- # spawning and killing proxies on its own.
@classmethod
- def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf):
+ def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf,
+ callback=None):
return external_process.ProcessManager(
- conf,
- router_id,
- ns_name)
+ conf=conf,
+ uuid=router_id,
+ namespace=ns_name,
+ default_cmd_callback=callback)
from neutron.agent.dhcp import config as dhcp_config
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.linux import dhcp
-from neutron.agent.linux import external_process
from neutron.agent.linux import interface
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
return conf
-def _get_dhcp_process_monitor(config):
- return external_process.ProcessMonitor(
- config=config,
- resource_type='dhcp')
-
-
def kill_dhcp(conf, namespace):
"""Disable DHCP for a network if DHCP is still active."""
network_id = namespace.replace(dhcp.NS_PREFIX, '')
dhcp_driver = importutils.import_object(
conf.dhcp_driver,
conf=conf,
- process_monitor=_get_dhcp_process_monitor(conf),
network=dhcp.NetModel(conf.use_namespaces, {'id': network_id}),
plugin=FakeDhcpPlugin())
UUID_FORMAT = "test-uuid-%d"
+SERVICE_NAME = "service"
class BaseTestProcessMonitor(base.BaseTestCase):
super(BaseTestProcessMonitor, self).setUp()
cfg.CONF.set_override('check_child_processes_interval', 1, 'AGENT')
self._child_processes = []
- self._ext_processes = None
+ self._process_monitor = None
self.create_child_processes_manager('respawn')
self.addCleanup(self.cleanup_spawned_children)
def create_child_processes_manager(self, action):
cfg.CONF.set_override('check_child_processes_action', action, 'AGENT')
- self._ext_processes = self.build_process_monitor()
+ self._process_monitor = self.build_process_monitor()
def build_process_monitor(self):
return external_process.ProcessMonitor(
for child_number in moves.xrange(n):
uuid = self._child_uuid(child_number)
_callback = self._make_cmdline_callback(uuid)
- self._ext_processes.enable(uuid=uuid,
- cmd_callback=_callback,
- service=service)
+ pm = external_process.ProcessManager(
+ conf=cfg.CONF,
+ uuid=uuid,
+ default_cmd_callback=_callback,
+ service=service)
+ pm.enable()
+ self._process_monitor.register(uuid, SERVICE_NAME, pm)
- pm = self._ext_processes.get_process_manager(uuid, service)
self._child_processes.append(pm)
@staticmethod
def _kill_last_child(self):
self._child_processes[-1].disable()
- def spawn_child_processes_and_kill_last(self, service=None, number=2):
- self.spawn_n_children(number, service)
- self._kill_last_child()
- self.assertFalse(self._child_processes[-1].active)
-
- def wait_for_all_childs_respawned(self):
- def all_childs_active():
+ def wait_for_all_children_respawned(self):
+ def all_children_active():
return all(pm.active for pm in self._child_processes)
- self._wait_for_condition(all_childs_active)
+ self._wait_for_condition(all_children_active)
def _wait_for_condition(self, exit_condition, extra_time=5):
# we need to allow extra_time for the check process to happen
eventlet.sleep(0.01)
def cleanup_spawned_children(self):
- if self._ext_processes:
- self._ext_processes.disable_all()
+ for pm in self._child_processes:
+ pm.disable()
class TestProcessMonitor(BaseTestProcessMonitor):
def test_respawn_handler(self):
- self.spawn_child_processes_and_kill_last()
- self.wait_for_all_childs_respawned()
-
- def test_new_process_monitor_finds_old_process(self):
- self.spawn_n_children(1)
- spawn_process = self._child_processes[-1]
- uuid = spawn_process.uuid
-
- another_pm = self.build_process_monitor()
- self.assertTrue(another_pm.is_active(uuid))
- self.assertEqual(another_pm.get_pid(uuid), spawn_process.pid)
-
- def test_tries_to_get_pid_for_unknown_uuid(self):
- self.assertIsNone(self._ext_processes.get_pid('bad-uuid'))
+ self.spawn_n_children(2)
+ self._kill_last_child()
+ self.wait_for_all_children_respawned()
from neutron.tests import base
TEST_UUID = 'test-uuid'
-TEST_SERVICE1 = 'testsvc'
+TEST_SERVICE = 'testsvc'
TEST_PID = 1234
def setUp(self):
super(BaseTestProcessMonitor, self).setUp()
- self.pm_patch = mock.patch("neutron.agent.linux.external_process."
- "ProcessManager", side_effect=mock.Mock)
- self.pmanager = self.pm_patch.start()
-
self.log_patch = mock.patch("neutron.agent.linux.external_process."
"LOG.error")
self.error_log = self.log_patch.start()
config=conf,
resource_type='test')
- def get_monitored_process_manager(self, uuid, service=None):
- self.pmonitor.enable(uuid=uuid, service=service, cmd_callback=None)
- return self.pmonitor.get_process_manager(uuid, service)
+ def get_monitored_process(self, uuid, service=None):
+ monitored_process = mock.Mock()
+ self.pmonitor.register(uuid=uuid,
+ service_name=service,
+ monitored_process=monitored_process)
+ return monitored_process
class TestProcessMonitor(BaseTestProcessMonitor):
def test_error_logged(self):
- pm = self.get_monitored_process_manager(TEST_UUID)
+ pm = self.get_monitored_process(TEST_UUID)
pm.active = False
self.pmonitor._check_child_processes()
self.assertTrue(self.error_log.called)
def test_exit_handler(self):
self.create_child_process_monitor('exit')
- pm = self.get_monitored_process_manager(TEST_UUID)
+ pm = self.get_monitored_process(TEST_UUID)
pm.active = False
with mock.patch.object(external_process.ProcessMonitor,
'_exit_handler') as exit_handler:
self.pmonitor._check_child_processes()
exit_handler.assert_called_once_with(TEST_UUID, None)
- def test_different_service_types(self):
- pm_none = self.get_monitored_process_manager(TEST_UUID)
- pm_svc1 = self.get_monitored_process_manager(TEST_UUID, TEST_SERVICE1)
- self.assertNotEqual(pm_none, pm_svc1)
-
- def test_active_method(self, service=None):
- pm = self.get_monitored_process_manager(TEST_UUID, service)
- pm.active = False
- self.assertFalse(self.pmonitor.is_active(TEST_UUID, service))
- pm.active = True
- self.assertTrue(self.pmonitor.is_active(TEST_UUID, service))
-
- def test_active_method_with_service(self):
- self.test_active_method(TEST_SERVICE1)
-
- def test_pid_method(self, service=None):
- pm = self.get_monitored_process_manager(TEST_UUID, service)
- pm.pid = TEST_PID
- self.assertEqual(TEST_PID, self.pmonitor.get_pid(TEST_UUID, service))
-
- def test_pid_method_with_service(self):
- self.test_pid_method(TEST_PID)
+ def test_register(self):
+ pm = self.get_monitored_process(TEST_UUID)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 1)
+ self.assertIn(pm, self.pmonitor._monitored_processes.values())
+
+ def test_register_same_service_twice(self):
+ self.get_monitored_process(TEST_UUID)
+ self.get_monitored_process(TEST_UUID)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 1)
+
+ def test_register_different_service_types(self):
+ self.get_monitored_process(TEST_UUID)
+ self.get_monitored_process(TEST_UUID, TEST_SERVICE)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 2)
+
+ def test_unregister(self):
+ self.get_monitored_process(TEST_UUID)
+ self.pmonitor.unregister(TEST_UUID, None)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 0)
+
+ def test_unregister_unknown_process(self):
+ self.pmonitor.unregister(TEST_UUID, None)
+ self.assertEqual(len(self.pmonitor._monitored_processes), 0)
def _process_manager_constructor_call(self):
return mock.call(conf=cfg.CONF,
- uuid=FAKE_NETWORK_UUID,
- namespace=FAKE_NETWORK_DHCP_NS,
- service=None,
- default_cmd_callback=mock.ANY,
- pid_file=None,
- cmd_addl_env=None)
+ uuid=FAKE_NETWORK_UUID,
+ namespace=FAKE_NETWORK_DHCP_NS,
+ default_cmd_callback=mock.ANY)
def _enable_dhcp_helper(self, network, enable_isolated_metadata=False,
is_isolated_network=False):
+ self.dhcp._process_monitor = mock.Mock()
if enable_isolated_metadata:
cfg.CONF.set_override('enable_isolated_metadata', True)
self.plugin.get_network_info.return_value = network
if is_isolated_network:
self.external_process.assert_has_calls([
self._process_manager_constructor_call(),
- mock.call().enable(reload_cfg=False)
+ mock.call().enable()
])
else:
self.assertFalse(self.external_process.call_count)
self._disable_dhcp_helper_driver_failure()
def test_enable_isolated_metadata_proxy(self):
+ self.dhcp._process_monitor = mock.Mock()
self.dhcp.enable_isolated_metadata_proxy(fake_network)
self.external_process.assert_has_calls([
self._process_manager_constructor_call(),
- mock.call().enable(reload_cfg=False)
+ mock.call().enable()
])
def test_disable_isolated_metadata_proxy(self):
self.dhcp.disable_isolated_metadata_proxy(fake_network)
destroy.assert_called_once_with(self.dhcp._process_monitor,
fake_network.id,
- fake_network.namespace)
+ fake_network.namespace,
+ cfg.CONF)
def _test_metadata_network(self, network):
cfg.CONF.set_override('enable_metadata_network', True)
self.external_process_p = mock.patch(
'neutron.agent.linux.external_process.ProcessManager')
self.external_process = self.external_process_p.start()
+ self.process_monitor = mock.patch(
+ 'neutron.agent.linux.external_process.ProcessMonitor').start()
self.send_arp_p = mock.patch(
'neutron.agent.linux.ip_lib.send_gratuitous_arp')
self.assertFalse(nat_rules_delta)
return ri
- def _expected_call_lookup_ri_process_enabled(self, ri, process):
+ def _expected_call_lookup_ri_process(self, ri, process):
"""Expected call if a process is looked up in a router instance."""
return [mock.call(uuid=ri.router['id'],
service=process,
default_cmd_callback=mock.ANY,
namespace=ri.ns_name,
- conf=self.conf,
- pid_file=None,
- cmd_addl_env=None)]
-
- def _expected_call_lookup_ri_process_disabled(self, ri, process):
- """Expected call if a process is looked up in a router instance."""
- # The ProcessManager does already exist, and it's found via
- # ProcessMonitor lookup _ensure_process_manager
- return [mock.call().__nonzero__()]
+ conf=mock.ANY)]
def _assert_ri_process_enabled(self, ri, process):
"""Verify that process was enabled for a router instance."""
- expected_calls = self._expected_call_lookup_ri_process_enabled(
+ expected_calls = self._expected_call_lookup_ri_process(
ri, process)
expected_calls.append(mock.call().enable(reload_cfg=True))
self.assertEqual(expected_calls, self.external_process.mock_calls)
def _assert_ri_process_disabled(self, ri, process):
"""Verify that process was disabled for a router instance."""
- expected_calls = self._expected_call_lookup_ri_process_disabled(
+ expected_calls = self._expected_call_lookup_ri_process(
ri, process)
expected_calls.append(mock.call().disable())
self.assertEqual(expected_calls, self.external_process.mock_calls)
self._assert_ri_process_enabled(ri, 'radvd')
# Reset the calls so we can check for disable radvd
self.external_process.reset_mock()
+ self.process_monitor.reset_mock()
# Remove the IPv6 interface and reprocess
del router[l3_constants.INTERFACE_KEY][1]
self._process_router_instance_for_agent(agent, ri, router)
if enableflag:
destroy_proxy.assert_called_with(mock.ANY,
router_id,
+ mock.ANY,
mock.ANY)
else:
self.assertFalse(destroy_proxy.call_count)
self.isdir = mock.patch('os.path.isdir').start()
self.isdir.return_value = False
+ self.external_process = mock.patch(
+ 'neutron.agent.linux.external_process.ProcessManager').start()
+
class TestDhcpBase(TestBase):
self.assertTrue(mocks['interface_name'].__set__.called)
self.assertTrue(mocks['_ensure_network_conf_dir'].called)
+ def _assert_disabled(self, lp):
+ self.assertTrue(lp.process_monitor.unregister.called)
+ self.assertTrue(self.external_process().disable.called)
+
def test_disable_not_active(self):
attrs_to_mock = dict([(a, mock.DEFAULT) for a in
['active', 'interface_name']])
mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
network = FakeDualNetwork()
lp = LocalChild(self.conf, network)
- lp.process_monitor.pid.return_value = 5
lp.device_manager = mock.Mock()
lp.disable()
lp.device_manager.destroy.assert_called_once_with(
network, 'tap0')
+ self._assert_disabled(lp)
def test_disable_retain_port(self):
attrs_to_mock = dict([(a, mock.DEFAULT) for a in
mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
lp = LocalChild(self.conf, network)
lp.disable(retain_port=True)
- self.assertTrue(lp.process_monitor.disable.called)
+ self._assert_disabled(lp)
def test_disable(self):
attrs_to_mock = dict([(a, mock.DEFAULT) for a in
mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
lp = LocalChild(self.conf, network)
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip:
- lp.process_monitor.pid.return_value = 5
lp.disable()
+ self._assert_disabled(lp)
+
self.mock_mgr.assert_has_calls([mock.call(self.conf, None),
mock.call().destroy(network, 'tap0')])
mocks['active'].__get__ = mock.Mock(return_value=False)
lp = LocalChild(self.conf, FakeDualNetwork())
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip:
- lp.process_monitor.pid.return_value = 5
lp.disable()
+ self._assert_disabled(lp)
+
ip.return_value.netns.delete.assert_called_with('qdhcp-ns')
def test_get_interface_name(self):
dm.spawn_process()
self.assertTrue(mocks['_output_opts_file'].called)
- test_pm.enable.assert_called_once_with(
- uuid=network.id,
- service='dnsmasq',
- namespace='qdhcp-ns',
- cmd_callback=mock.ANY,
- reload_cfg=False,
- pid_file=expected_pid_file)
- call_kwargs = test_pm.method_calls[0][2]
- cmd_callback = call_kwargs['cmd_callback']
+ self.assertTrue(test_pm.register.called)
+ self.external_process().enable.assert_called_once_with(
+ reload_cfg=False)
+ call_kwargs = self.external_process.mock_calls[0][2]
+ cmd_callback = call_kwargs['default_cmd_callback']
result_cmd = cmd_callback(expected_pid_file)
test_pm = mock.Mock()
dm = self._get_dnsmasq(FakeDualNetwork(), test_pm)
dm.reload_allocations()
- test_pm.enable.assert_has_calls([mock.call(uuid=mock.ANY,
- cmd_callback=mock.ANY,
- namespace=mock.ANY,
- service=mock.ANY,
- reload_cfg=True,
- pid_file=mock.ANY)])
+ self.assertTrue(test_pm.register.called)
+ self.external_process().enable.assert_called_once_with(
+ reload_cfg=True)
self.safe.assert_has_calls([
mock.call(exp_host_name, exp_host_data),
util.kill_dhcp(conf, 'ns')
expected_params = {'conf': conf, 'network': mock.ANY,
- 'plugin': mock.ANY,
- 'process_monitor': mock.ANY}
+ 'plugin': mock.ANY}
import_object.assert_called_once_with('driver', **expected_params)
if dhcp_active: