self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
- self.setup_double_mock_guard()
self.addCleanup(mock.patch.stopall)
if bool_from_env('OS_STDOUT_CAPTURE'):
self.addOnException(self.check_for_systemexit)
self.orig_pid = os.getpid()
- def setup_double_mock_guard(self):
- # mock.patch.stopall() uses a set in python < 3.4 so patches may not
- # be unwound in the same order they were applied. This can leak mocks
- # and cause tests down the line to fail.
- # More info: http://bugs.python.org/issue21239
- #
- # Use mock to patch mock.patch.start to check if a target has already
- # been patched and fail if it has.
- self.first_traceback = {}
- orig_start = mock._patch.start
-
- def new_start(mself):
- mytarget = mself.getter()
- myattr = mself.attribute
- for patch in mself._active_patches:
- if (mytarget, myattr) == (patch.target, patch.attribute):
- key = str((patch.target, patch.attribute))
- self.fail("mock.patch was setup on an already patched "
- "target %s.%s. Stop the original patch before "
- "starting a new one. Traceback of 1st patch: %s"
- % (mytarget, myattr,
- ''.join(self.first_traceback.get(key, []))))
- self.first_traceback[
- str((mytarget, myattr))] = traceback.format_stack()[:-2]
- return orig_start(mself)
-
- mock.patch('mock._patch.start', new=new_start).start()
-
def check_for_systemexit(self, exc_info):
if isinstance(exc_info[1], SystemExit):
if os.getpid() != self.orig_pid:
with mock.patch.object(self.proc, '_kill') as kill:
self.proc._handle_process_error()
- kill.assert_has_calls(mock.call(respawning=False))
+ kill.assert_has_calls([mock.call(respawning=False)])
def test__handle_process_error_kills_without_respawn(self):
self.proc.respawn_interval = 1
with mock.patch('eventlet.sleep') as sleep:
self.proc._handle_process_error()
- kill.assert_has_calls(mock.call(respawning=True))
- sleep.assert_has_calls(mock.call(self.proc.respawn_interval))
+ kill.assert_has_calls([mock.call(respawning=True)])
+ sleep.assert_has_calls([mock.call(self.proc.respawn_interval)])
spawn.assert_called_once_with()
def _test__watch_process(self, callback, kill_event):
with mock.patch.object(daemon.LOG, 'critical') as log_critical:
self.assertRaises(exceptions.FailToDropPrivilegesExit,
daemon.setuid, '321')
- log_critical.assert_once_with(mock.ANY)
+ log_critical.assert_called_once_with(mock.ANY)
def test_setgid_with_name(self):
with mock.patch('grp.getgrnam', return_value=FakeEntry('gr_gid', 123)):
with mock.patch.object(daemon.LOG, 'critical') as log_critical:
self.assertRaises(exceptions.FailToDropPrivilegesExit,
daemon.setgid, '321')
- log_critical.assert_once_with(mock.ANY)
+ log_critical.assert_called_once_with(mock.ANY)
@mock.patch.object(os, 'setgroups')
@mock.patch.object(daemon, 'setgid')
with mock.patch.object(daemon.LOG, 'critical') as log_critical:
self.assertRaises(exceptions.FailToDropPrivilegesExit,
daemon.drop_privileges, 'user')
- log_critical.assert_once_with(mock.ANY)
+ log_critical.assert_called_once_with(mock.ANY)
class TestPidfile(base.BaseTestCase):
with mock.patch.object(ep, 'utils') as utils:
manager.disable()
- utils.assert_has_calls(
- mock.call.execute(['kill', '-9', 4], run_as_root=True))
+ utils.assert_has_calls([
+ mock.call.execute(['kill', '-9', 4],
+ run_as_root=True)])
def test_disable_namespace(self):
with mock.patch.object(ep.ProcessManager, 'pid') as pid:
with mock.patch.object(ep, 'utils') as utils:
manager.disable()
- utils.assert_has_calls(
- mock.call.execute(['kill', '-9', 4], run_as_root=True))
+ utils.assert_has_calls([
+ mock.call.execute(['kill', '-9', 4],
+ run_as_root=True)])
def test_disable_not_active(self):
with mock.patch.object(ep.ProcessManager, 'pid') as pid:
self.ip_dev.assert_has_calls([
mock.call(self.device_name, namespace=self.namespace),
mock.call().link.delete()])
- self.ip.assert_has_calls(mock.call().garbage_collect_namespace())
+ self.ip.assert_has_calls([mock.call().garbage_collect_namespace()])
with polling.get_polling_manager(minimize_polling=True) as pm:
self.assertEqual(pm.__class__,
polling.InterfacePollingMinimizer)
- mock_stop.assert_has_calls(mock.call())
- mock_start.assert_has_calls(mock.call())
+ mock_stop.assert_has_calls([mock.call()])
+ mock_start.assert_has_calls([mock.call()])
class TestInterfacePollingMinimizer(base.BaseTestCase):
def _test__port_action_good_action(self, action, port, expected_call):
self.callbacks._port_action(self.plugin, mock.Mock(),
port, action)
- self.plugin.assert_has_calls(expected_call)
+ self.plugin.assert_has_calls([expected_call])
def test_port_action_create_port(self):
self._test__port_action_good_action(
host='foo_host',
port_id='foo_port_id',
port=port)
- self.plugin.assert_has_calls(
- mock.call.update_port(mock.ANY, 'foo_port_id', expected_port))
+ self.plugin.assert_has_calls([
+ mock.call.update_port(mock.ANY, 'foo_port_id', expected_port)])
def test_release_dhcp_port(self):
port_retval = dict(id='port_id', fixed_ips=[dict(subnet_id='a')])
self.metering_rpc_patch = mock.patch(metering_rpc, return_value=[])
self.metering_rpc_patch.start()
- self.driver_patch = mock.patch(self.noop_driver, autospec=True)
+ self.driver_patch = mock.patch(self.noop_driver, spec=True)
self.driver_patch.start()
loopingcall_patch = mock.patch(