]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix IF checks on spawned green thread instance
authorKaiwei Fan <fank@vmware.com>
Thu, 5 Sep 2013 20:57:13 +0000 (13:57 -0700)
committerKaiwei Fan <fank@vmware.com>
Fri, 6 Sep 2013 22:24:42 +0000 (15:24 -0700)
Initially the symptom looks like race condition between two threads when
stopping the task manager. After further analysis/troubleshooting, it
turns out that two threads are spawned if a task manager is stopped and
started again, causing unexpected errors.

The IF check on the spawned thread sometimes return True sometime return False
if not compared against None explicitly. This makes start() method
think no thread has been started or stop() method think no thread is started.
Change the check to compare against None.

Also fixed a problem in unit-test where a thread may never terminated when
a stop call is invoked during db access.

Closes-Bug: 1221486
Change-Id: I0d67bfe8fef7a390f0d6bc0f5a42835f86a9fb27

neutron/plugins/nicira/vshield/tasks/tasks.py
neutron/tests/unit/nicira/test_vcns_driver.py

index 5a76986961e36e774034ae066c0b581ab1bf7f82..2128b3cbd671061883cdaf3f852011e6242ac4fb 100755 (executable)
@@ -169,12 +169,11 @@ class TaskManager():
         self._req = event.Event()
 
         # TaskHandler stopped event
-        self._stopped = event.Event()
+        self._stopped = False
 
         # Periodic function trigger
         self._monitor = None
         self._monitor_busy = False
-        self._monitor_stop = None
 
         # Thread handling the task request
         self._thread = None
@@ -221,8 +220,8 @@ class TaskManager():
     def _check_pending_tasks(self):
         """Check all pending tasks status."""
         for resource_id in self._tasks.keys():
-            if self._monitor_stop:
-                # looping call is asked to stop, return now
+            if self._stopped:
+                # Task manager is stopped, return now
                 return
 
             tasks = self._tasks[resource_id]
@@ -270,6 +269,12 @@ class TaskManager():
 
     def _abort(self):
         """Abort all tasks."""
+        # put all tasks haven't been received by main thread to queue
+        # so the following abort handling can cover them
+        for t in self._tasks_queue:
+            self._enqueue(t)
+        self._tasks_queue.clear()
+
         for resource_id in self._tasks.keys():
             tasks = list(self._tasks[resource_id])
             for task in tasks:
@@ -287,6 +292,15 @@ class TaskManager():
     def run(self):
         while True:
             try:
+                if self._stopped:
+                    # Somehow greenlet.GreenletExit exception is ignored
+                    # during unit-test when self._execute() is making db
+                    # access. This makes this thread not terminating and
+                    # stop() caller wait indefinitely. So we added a check
+                    # here before trying to do a block call on getting a
+                    # task from queue
+                    break
+
                 # get a task from queue, or timeout for periodic status check
                 task = self._get_task()
                 if task.resource_id in self._tasks:
@@ -295,13 +309,17 @@ class TaskManager():
                     self._enqueue(task)
                     continue
 
-                status = self._execute(task)
-
-                if status != TaskStatus.PENDING:
-                    self._result(task)
-                    continue
-
-                self._enqueue(task)
+                try:
+                    self._execute(task)
+                finally:
+                    if task.status is None:
+                        # The thread is killed during _execute(). To guarantee
+                        # the task been aborted correctly, put it to the queue.
+                        self._enqueue(task)
+                    elif task.status != TaskStatus.PENDING:
+                        self._result(task)
+                    else:
+                        self._enqueue(task)
             except greenlet.GreenletExit:
                 break
             except Exception:
@@ -310,11 +328,8 @@ class TaskManager():
 
         self._monitor.stop()
         if self._monitor_busy:
-            self._monitor_stop = event.Event()
-            self._monitor_stop.wait()
-            self._monitor_stop = None
+            self._monitor.wait()
         self._abort()
-        self._stopped.send()
 
     def add(self, task):
         task.id = uuid.uuid1()
@@ -324,10 +339,11 @@ class TaskManager():
         return task.id
 
     def stop(self):
-        if not self._thread:
+        if self._thread is None:
             return
+        self._stopped = True
         self._thread.kill()
-        self._stopped.wait()
+        self._thread.wait()
         self._thread = None
 
     def has_pending_task(self):
@@ -357,26 +373,27 @@ class TaskManager():
             self.run()
 
         def _loopingcall_callback():
+            self._monitor_busy = True
             try:
-                self._monitor_busy = True
                 self._check_pending_tasks()
-                self._monitor_busy = False
-                if self._monitor_stop:
-                    self._monitor_stop.send()
             except Exception:
                 LOG.exception(_("Exception in _check_pending_tasks"))
+            self._monitor_busy = False
 
-        if self._thread:
+        if self._thread is not None:
             return self
 
         if interval is None or interval == 0:
             interval = self._interval
 
+        self._stopped = False
         self._thread = greenthread.spawn(_inner)
         self._monitor = loopingcall.FixedIntervalLoopingCall(
             _loopingcall_callback)
         self._monitor.start(interval / 1000.0,
                             interval / 1000.0)
+        # To allow the created thread start running
+        greenthread.sleep(0)
 
         return self
 
index b5867f4b73a9485ab1af031ccf5fb453c525df4c..e26e2a1318cba84e522ef7896ae62e27157a0deb 100644 (file)
@@ -212,18 +212,25 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
             task.wait(TaskState.RESULT)
             self.assertTrue(task.userdata['result'])
 
-    def test_task_manager_stop(self):
+    def _test_task_manager_stop(self, exec_wait=False, result_wait=False,
+                                stop_wait=0):
         def _exec(task):
+            if exec_wait:
+                greenthread.sleep(0.01)
             return TaskStatus.PENDING
 
         def _status(task):
-            greenthread.sleep(0.1)
+            greenthread.sleep(0.01)
             return TaskStatus.PENDING
 
         def _result(task):
+            if result_wait:
+                greenthread.sleep(0)
             pass
 
         manager = ts.TaskManager().start(100)
+        manager.stop()
+        manager.start(100)
 
         alltasks = {}
         for i in range(100):
@@ -235,12 +242,25 @@ class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
                 tasks.append(task)
             alltasks[res] = tasks
 
-        greenthread.sleep(2)
+        greenthread.sleep(stop_wait)
         manager.stop()
+
         for res, tasks in alltasks.iteritems():
             for task in tasks:
                 self.assertEqual(task.status, TaskStatus.ABORT)
 
+    def test_task_manager_stop_1(self):
+        self._test_task_manager_stop(True, True, 0)
+
+    def test_task_manager_stop_2(self):
+        self._test_task_manager_stop(True, True, 1)
+
+    def test_task_manager_stop_3(self):
+        self._test_task_manager_stop(False, False, 0)
+
+    def test_task_manager_stop_4(self):
+        self._test_task_manager_stop(False, False, 1)
+
 
 class VcnsDriverTestCase(base.BaseTestCase):