self._req = event.Event()
# TaskHandler stopped event
- self._stopped = event.Event()
+ self._stopped = False
# Periodic function trigger
self._monitor = None
self._monitor_busy = False
- self._monitor_stop = None
# Thread handling the task request
self._thread = None
def _check_pending_tasks(self):
"""Check all pending tasks status."""
for resource_id in self._tasks.keys():
- if self._monitor_stop:
- # looping call is asked to stop, return now
+ if self._stopped:
+ # Task manager is stopped, return now
return
tasks = self._tasks[resource_id]
def _abort(self):
"""Abort all tasks."""
+ # put all tasks haven't been received by main thread to queue
+ # so the following abort handling can cover them
+ for t in self._tasks_queue:
+ self._enqueue(t)
+ self._tasks_queue.clear()
+
for resource_id in self._tasks.keys():
tasks = list(self._tasks[resource_id])
for task in tasks:
def run(self):
while True:
try:
+ if self._stopped:
+ # Somehow greenlet.GreenletExit exception is ignored
+ # during unit-test when self._execute() is making db
+ # access. This makes this thread not terminating and
+ # stop() caller wait indefinitely. So we added a check
+ # here before trying to do a block call on getting a
+ # task from queue
+ break
+
# get a task from queue, or timeout for periodic status check
task = self._get_task()
if task.resource_id in self._tasks:
self._enqueue(task)
continue
- status = self._execute(task)
-
- if status != TaskStatus.PENDING:
- self._result(task)
- continue
-
- self._enqueue(task)
+ try:
+ self._execute(task)
+ finally:
+ if task.status is None:
+ # The thread is killed during _execute(). To guarantee
+ # the task been aborted correctly, put it to the queue.
+ self._enqueue(task)
+ elif task.status != TaskStatus.PENDING:
+ self._result(task)
+ else:
+ self._enqueue(task)
except greenlet.GreenletExit:
break
except Exception:
self._monitor.stop()
if self._monitor_busy:
- self._monitor_stop = event.Event()
- self._monitor_stop.wait()
- self._monitor_stop = None
+ self._monitor.wait()
self._abort()
- self._stopped.send()
def add(self, task):
task.id = uuid.uuid1()
return task.id
def stop(self):
- if not self._thread:
+ if self._thread is None:
return
+ self._stopped = True
self._thread.kill()
- self._stopped.wait()
+ self._thread.wait()
self._thread = None
def has_pending_task(self):
self.run()
def _loopingcall_callback():
+ self._monitor_busy = True
try:
- self._monitor_busy = True
self._check_pending_tasks()
- self._monitor_busy = False
- if self._monitor_stop:
- self._monitor_stop.send()
except Exception:
LOG.exception(_("Exception in _check_pending_tasks"))
+ self._monitor_busy = False
- if self._thread:
+ if self._thread is not None:
return self
if interval is None or interval == 0:
interval = self._interval
+ self._stopped = False
self._thread = greenthread.spawn(_inner)
self._monitor = loopingcall.FixedIntervalLoopingCall(
_loopingcall_callback)
self._monitor.start(interval / 1000.0,
interval / 1000.0)
+ # To allow the created thread start running
+ greenthread.sleep(0)
return self
task.wait(TaskState.RESULT)
self.assertTrue(task.userdata['result'])
- def test_task_manager_stop(self):
+ def _test_task_manager_stop(self, exec_wait=False, result_wait=False,
+ stop_wait=0):
def _exec(task):
+ if exec_wait:
+ greenthread.sleep(0.01)
return TaskStatus.PENDING
def _status(task):
- greenthread.sleep(0.1)
+ greenthread.sleep(0.01)
return TaskStatus.PENDING
def _result(task):
+ if result_wait:
+ greenthread.sleep(0)
pass
manager = ts.TaskManager().start(100)
+ manager.stop()
+ manager.start(100)
alltasks = {}
for i in range(100):
tasks.append(task)
alltasks[res] = tasks
- greenthread.sleep(2)
+ greenthread.sleep(stop_wait)
manager.stop()
+
for res, tasks in alltasks.iteritems():
for task in tasks:
self.assertEqual(task.status, TaskStatus.ABORT)
+ def test_task_manager_stop_1(self):
+ self._test_task_manager_stop(True, True, 0)
+
+ def test_task_manager_stop_2(self):
+ self._test_task_manager_stop(True, True, 1)
+
+ def test_task_manager_stop_3(self):
+ self._test_task_manager_stop(False, False, 0)
+
+ def test_task_manager_stop_4(self):
+ self._test_task_manager_stop(False, False, 1)
+
class VcnsDriverTestCase(base.BaseTestCase):