]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
heat engine move to per-stack periodic watch threads
authorSteven Hardy <shardy@redhat.com>
Mon, 19 Nov 2012 16:55:23 +0000 (16:55 +0000)
committerSteven Hardy <shardy@redhat.com>
Mon, 19 Nov 2012 22:26:20 +0000 (22:26 +0000)
Use the stack thread groups, so a separate watch thread is started
for each stack - this avoids some of the context scoping problems
previously encountered since we can pass the correct context to the
periodic task when starting it

Fixes bug 1078779

Change-Id: I56e6a4b126199587e91548f450956d77ab2158f3
Signed-off-by: Steven Hardy <shardy@redhat.com>
heat/engine/service.py
heat/engine/watchrule.py
heat/tests/test_engine_service.py

index 0e9638b33d548dc5cdac9fea8eb1f22df78975f2..8b75894158e437ed94430f0c825870b35ce73cf0 100644 (file)
@@ -53,12 +53,48 @@ class EngineService(service.Service):
             self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
         self.stg[stack_id].add_thread(func, *args, **kwargs)
 
+    def _timer_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
+        """
+        Define a periodic task, to be run in a separate thread, in the stack
+        threadgroups.  Periodicity is cfg.CONF.periodic_interval
+        """
+        if stack_id not in self.stg:
+            thr_name = '%s-%s' % (stack_name, stack_id)
+            self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
+        self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
+                                     func, *args, **kwargs)
+
+    def _service_task(self):
+        """
+        This is a dummy task which gets queued on the service.Service
+        threadgroup.  Without this service.Service sees nothing running
+        i.e has nothing to wait() on, so the process exits..
+        This could also be used to trigger periodic non-stack-specific
+        housekeeping tasks
+        """
+        pass
+
     def start(self):
         super(EngineService, self).start()
-        admin_context = context.get_admin_context()
+
+        # Create dummy service task, because when there is nothing queued
+        # on self.tg the process exits
         self.tg.add_timer(cfg.CONF.periodic_interval,
-                    self._periodic_watcher_task,
-                    context=admin_context)
+                          self._service_task)
+
+        # Create a periodic_watcher_task per-stack
+        # We use the admin context to get the list of all stacks
+        # then retrieve the stored per-stack context to be passed to
+        # the periodic task
+        admin_context = context.get_admin_context()
+        stacks = db_api.stack_get_all(admin_context)
+        for s in stacks:
+            user_creds = db_api.user_creds_get(s.user_creds_id)
+            stack_context = context.RequestContext.from_dict(user_creds)
+            self._timer_in_thread(s.id, s.name,
+                                  self._periodic_watcher_task,
+                                  context=stack_context,
+                                  sid=s.id)
 
     def identify_stack(self, context, stack_name):
         """
@@ -157,6 +193,12 @@ class EngineService(service.Service):
 
         self._start_in_thread(stack_id, stack_name, stack.create)
 
+        # Schedule a periodic watcher task for this stack
+        self._timer_in_thread(stack_id, stack_name,
+                              self._periodic_watcher_task,
+                              context=context,
+                              sid=stack_id)
+
         return dict(stack.identifier())
 
     def update_stack(self, context, stack_identity, template, params, args):
@@ -353,9 +395,10 @@ class EngineService(service.Service):
 
         return [None, resource.metadata]
 
-    def _periodic_watcher_task(self, context):
+    def _periodic_watcher_task(self, context, sid):
         try:
-            wrn = [w.name for w in db_api.watch_rule_get_all(context)]
+            wrn = [w.name for w in
+                   db_api.watch_rule_get_all_by_stack(context, sid)]
         except Exception as ex:
             logger.warn('periodic_task db error (%s) %s' %
                         ('watch rule removed?', str(ex)))
index 3b2863e61afdc32118594fc6ae69b93118ceaa01..3ff90840ca72e3bcb6d4bcd800260d4cbae9dee1 100644 (file)
@@ -229,29 +229,10 @@ class WatchRule(object):
                         new_state)
             actioned = True
         else:
-            # FIXME : hack workaround for new stack_get_by_name tenant
-            # scoping, this is the simplest possible solution to the
-            # HA/Autoscaling regression described in bug/1078779
-            # Further work in-progress here (shardy) as this
-            # breaks when stack_id is not unique accross tenants
-            sl = [x for x in
-                  db_api.stack_get_all(self.context)
-                  if x.id == self.stack_id]
-            s = None
-            if len(sl) == 1:
-                s = sl[0]
-            elif len(sl) > 1:
-                logger.error("stack %s not unique, " % self.stack_id
-                             + "cannot action watch rule")
-            else:
-                logger.error("stack %s could not be found, " %
-                             self.stack_id + "cannot action watch rule")
-
+            s = db_api.stack_get(self.context, self.stack_id)
             if s and s.status in (parser.Stack.CREATE_COMPLETE,
-                                  parser.Stack.UPDATE_COMPLETE):
-                user_creds = db_api.user_creds_get(s.user_creds_id)
-                ctxt = ctxtlib.RequestContext.from_dict(user_creds)
-                stack = parser.Stack.load(ctxt, stack=s)
+                                          parser.Stack.UPDATE_COMPLETE):
+                stack = parser.Stack.load(self.context, stack=s)
                 for a in self.rule[self.ACTION_MAP[new_state]]:
                     greenpool.spawn_n(stack[a].alarm)
                 actioned = True
index 7538cf25b156a4279f79a76940515a9ed8cd3210..4a6c74ea3f3c97270e164bc326852583faf0f435 100644 (file)
@@ -80,6 +80,10 @@ def setup_mocks(mocks, stack):
 
 
 class DummyThreadGroup(object):
+    def add_timer(self, interval, callback, initial_delay=None,
+                  *args, **kwargs):
+        pass
+
     def add_thread(self, callback, *args, **kwargs):
         pass