1 # Copyright (c) 2014 Red Hat, Inc.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 from oslo_config import cfg
18 from neutron._i18n import _
19 from neutron.agent.linux import external_process
20 from neutron.agent.linux import keepalived
21 from neutron.agent.linux import utils
22 from neutron.tests import base
23 from neutron.tests.unit.agent.linux import test_keepalived
26 class KeepalivedManagerTestCase(base.BaseTestCase,
27 test_keepalived.KeepalivedConfBaseMixin):
30 super(KeepalivedManagerTestCase, self).setUp()
31 cfg.CONF.set_override('check_child_processes_interval', 1, 'AGENT')
33 self.expected_config = self._get_config()
34 self.process_monitor = external_process.ProcessMonitor(cfg.CONF,
36 self.manager = keepalived.KeepalivedManager(
37 'router1', self.expected_config, self.process_monitor,
38 conf_path=cfg.CONF.state_path)
39 self.addCleanup(self.manager.disable)
41 def test_keepalived_spawn(self):
43 process = external_process.ProcessManager(
47 pids_path=cfg.CONF.state_path)
48 self.assertTrue(process.active)
50 self.assertEqual(self.expected_config.get_config_str(),
51 self.manager.get_conf_on_disk())
53 def _test_keepalived_respawns(self, normal_exit=True):
55 process = self.manager.get_process()
57 utils.wait_until_true(
58 lambda: process.active,
61 exception=RuntimeError(_("Keepalived didn't spawn")))
63 exit_code = '-15' if normal_exit else '-9'
65 # Exit the process, and see that when it comes back
66 # It's indeed a different process
67 utils.execute(['kill', exit_code, pid], run_as_root=True)
68 utils.wait_until_true(
69 lambda: process.active and pid != process.pid,
72 exception=RuntimeError(_("Keepalived didn't respawn")))
74 def test_keepalived_respawns(self):
75 self._test_keepalived_respawns()
77 def test_keepalived_respawn_with_unexpected_exit(self):
78 self._test_keepalived_respawns(False)