self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
self.stg[stack_id].add_thread(func, *args, **kwargs)
+ def _timer_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
+ """
+ Define a periodic task, to be run in a separate thread, in the stack
+ threadgroups. Periodicity is cfg.CONF.periodic_interval
+ """
+ if stack_id not in self.stg:
+ thr_name = '%s-%s' % (stack_name, stack_id)
+ self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
+ self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
+ func, *args, **kwargs)
+
+ def _service_task(self):
+ """
+ This is a dummy task which gets queued on the service.Service
+ threadgroup. Without this service.Service sees nothing running
+ i.e has nothing to wait() on, so the process exits..
+ This could also be used to trigger periodic non-stack-specific
+ housekeeping tasks
+ """
+ pass
+
def start(self):
super(EngineService, self).start()
- admin_context = context.get_admin_context()
+
+ # Create dummy service task, because when there is nothing queued
+ # on self.tg the process exits
self.tg.add_timer(cfg.CONF.periodic_interval,
- self._periodic_watcher_task,
- context=admin_context)
+ self._service_task)
+
+ # Create a periodic_watcher_task per-stack
+ # We use the admin context to get the list of all stacks
+ # then retrieve the stored per-stack context to be passed to
+ # the periodic task
+ admin_context = context.get_admin_context()
+ stacks = db_api.stack_get_all(admin_context)
+ for s in stacks:
+ user_creds = db_api.user_creds_get(s.user_creds_id)
+ stack_context = context.RequestContext.from_dict(user_creds)
+ self._timer_in_thread(s.id, s.name,
+ self._periodic_watcher_task,
+ context=stack_context,
+ sid=s.id)
def identify_stack(self, context, stack_name):
"""
self._start_in_thread(stack_id, stack_name, stack.create)
+ # Schedule a periodic watcher task for this stack
+ self._timer_in_thread(stack_id, stack_name,
+ self._periodic_watcher_task,
+ context=context,
+ sid=stack_id)
+
return dict(stack.identifier())
def update_stack(self, context, stack_identity, template, params, args):
return [None, resource.metadata]
- def _periodic_watcher_task(self, context):
+ def _periodic_watcher_task(self, context, sid):
try:
- wrn = [w.name for w in db_api.watch_rule_get_all(context)]
+ wrn = [w.name for w in
+ db_api.watch_rule_get_all_by_stack(context, sid)]
except Exception as ex:
logger.warn('periodic_task db error (%s) %s' %
('watch rule removed?', str(ex)))
new_state)
actioned = True
else:
- # FIXME : hack workaround for new stack_get_by_name tenant
- # scoping, this is the simplest possible solution to the
- # HA/Autoscaling regression described in bug/1078779
- # Further work in-progress here (shardy) as this
- # breaks when stack_id is not unique accross tenants
- sl = [x for x in
- db_api.stack_get_all(self.context)
- if x.id == self.stack_id]
- s = None
- if len(sl) == 1:
- s = sl[0]
- elif len(sl) > 1:
- logger.error("stack %s not unique, " % self.stack_id
- + "cannot action watch rule")
- else:
- logger.error("stack %s could not be found, " %
- self.stack_id + "cannot action watch rule")
-
+ s = db_api.stack_get(self.context, self.stack_id)
if s and s.status in (parser.Stack.CREATE_COMPLETE,
- parser.Stack.UPDATE_COMPLETE):
- user_creds = db_api.user_creds_get(s.user_creds_id)
- ctxt = ctxtlib.RequestContext.from_dict(user_creds)
- stack = parser.Stack.load(ctxt, stack=s)
+ parser.Stack.UPDATE_COMPLETE):
+ stack = parser.Stack.load(self.context, stack=s)
for a in self.rule[self.ACTION_MAP[new_state]]:
greenpool.spawn_n(stack[a].alarm)
actioned = True