eventlet.sleep(0.01)
def test_stopping_async_process_lifecycle(self):
- with self.assert_max_execution_time():
- proc = async_process.AsyncProcess(['tail', '-f',
- self.test_file_path])
- proc.start()
- self._check_stdout(proc)
- proc.stop()
+ proc = async_process.AsyncProcess(['tail', '-f',
+ self.test_file_path])
+ proc.start()
+ self._check_stdout(proc)
+ proc.stop()
- # Ensure that the process and greenthreads have stopped
- proc._process.wait()
- self.assertEqual(proc._process.returncode, -9)
- for watcher in proc._watchers:
- watcher.wait()
+ # Ensure that the process and greenthreads have stopped
+ proc._process.wait()
+ self.assertEqual(proc._process.returncode, -9)
+ for watcher in proc._watchers:
+ watcher.wait()
def test_async_process_respawns(self):
- with self.assert_max_execution_time():
- proc = async_process.AsyncProcess(['tail', '-f',
- self.test_file_path],
- respawn_interval=0)
- proc.start()
+ proc = async_process.AsyncProcess(['tail', '-f',
+ self.test_file_path],
+ respawn_interval=0)
+ proc.start()
- # Ensure that the same output is read twice
- self._check_stdout(proc)
- pid = proc._get_pid_to_kill()
- proc._kill_process(pid)
- self._check_stdout(proc)
- proc.stop()
+ # Ensure that the same output is read twice
+ self._check_stdout(proc)
+ pid = proc._get_pid_to_kill()
+ proc._kill_process(pid)
+ self._check_stdout(proc)
+ proc.stop()
eventlet.sleep(0.01)
def test_killed_monitor_respawns(self):
- with self.assert_max_execution_time():
- self.monitor.respawn_interval = 0
- old_pid = self.monitor._process.pid
- output1 = self.collect_initial_output()
- pid = self.monitor._get_pid_to_kill()
- self.monitor._kill_process(pid)
- self.monitor._reset_queues()
- while (self.monitor._process.pid == old_pid):
- eventlet.sleep(0.01)
- output2 = self.collect_initial_output()
- # Initial output should appear twice
- self.assertEqual(output1, output2)
+ self.monitor.respawn_interval = 0
+ old_pid = self.monitor._process.pid
+ output1 = self.collect_initial_output()
+ pid = self.monitor._get_pid_to_kill()
+ self.monitor._kill_process(pid)
+ self.monitor._reset_queues()
+ while (self.monitor._process.pid == old_pid):
+ eventlet.sleep(0.01)
+ output2 = self.collect_initial_output()
+ # Initial output should appear twice
+ self.assertEqual(output1, output2)
class TestSimpleInterfaceMonitor(BaseMonitorTest):
self.assertFalse(self.monitor.has_updates,
'has_updates without port addition should be False')
self.create_resource('test-port-', self.bridge.add_port)
- with self.assert_max_execution_time():
- # has_updates after port addition should become True
- while not self.monitor.has_updates:
- eventlet.sleep(0.01)
+ # has_updates after port addition should become True
+ while not self.monitor.has_updates:
+ eventlet.sleep(0.01)
[testenv:functional]
setenv = OS_TEST_PATH=./neutron/tests/functional
+ OS_TEST_TIMEOUT=90
[testenv:dsvm-functional]
setenv = OS_TEST_PATH=./neutron/tests/functional
OS_SUDO_TESTING=1
OS_ROOTWRAP_CMD=sudo /usr/local/bin/neutron-rootwrap /etc/neutron/rootwrap.conf
OS_FAIL_ON_MISSING_DEPS=1
+ OS_TEST_TIMEOUT=90
sitepackages=True
[tox:jenkins]