--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Openstack, LLC.
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gc
+import pprint
+import sys
+import traceback
+
+import eventlet
+import eventlet.backdoor
+import greenlet
+
+from heat.openstack.common import cfg
+
+eventlet_backdoor_opts = [
+ cfg.IntOpt('backdoor_port',
+ default=None,
+ help='port for eventlet backdoor to listen')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(eventlet_backdoor_opts)
+
+
+def _dont_use_this():
+ print "Don't use this, just disconnect instead"
+
+
+def _find_objects(t):
+ return filter(lambda o: isinstance(o, t), gc.get_objects())
+
+
+def _print_greenthreads():
+ for i, gt in enumerate(find_objects(greenlet.greenlet)):
+ print i, gt
+ traceback.print_stack(gt.gr_frame)
+ print
+
+
+def initialize_if_enabled():
+ backdoor_locals = {
+ 'exit': _dont_use_this, # So we don't exit the entire process
+ 'quit': _dont_use_this, # So we don't exit the entire process
+ 'fo': _find_objects,
+ 'pgt': _print_greenthreads,
+ }
+
+ if CONF.backdoor_port is None:
+ return
+
+ # NOTE(johannes): The standard sys.displayhook will print the value of
+ # the last expression and set it to __builtin__._, which overwrites
+ # the __builtin__._ that gettext sets. Let's switch to using pprint
+ # since it won't interact poorly with gettext, and it's easier to
+ # read the output too.
+ def displayhook(val):
+ if val is not None:
+ pprint.pprint(val)
+ sys.displayhook = displayhook
+
+ eventlet.spawn(eventlet.backdoor.backdoor_server,
+ eventlet.listen(('localhost', CONF.backdoor_port)),
+ locals=backdoor_locals)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+from eventlet import event
+from eventlet import greenthread
+
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class LoopingCallDone(Exception):
+ """Exception to break out and stop a LoopingCall.
+
+ The poll-function passed to LoopingCall can raise this exception to
+ break out of the loop normally. This is somewhat analogous to
+ StopIteration.
+
+ An optional return-value can be included as the argument to the exception;
+ this return-value will be returned by LoopingCall.wait()
+
+ """
+
+ def __init__(self, retvalue=True):
+ """:param retvalue: Value that LoopingCall.wait() should return."""
+ self.retvalue = retvalue
+
+
+class LoopingCall(object):
+ def __init__(self, f=None, *args, **kw):
+ self.args = args
+ self.kw = kw
+ self.f = f
+ self._running = False
+
+ def start(self, interval, initial_delay=None):
+ self._running = True
+ done = event.Event()
+
+ def _inner():
+ if initial_delay:
+ greenthread.sleep(initial_delay)
+
+ try:
+ while self._running:
+ self.f(*self.args, **self.kw)
+ if not self._running:
+ break
+ greenthread.sleep(interval)
+ except LoopingCallDone, e:
+ self.stop()
+ done.send(e.retvalue)
+ except Exception:
+ LOG.exception(_('in looping call'))
+ done.send_exception(*sys.exc_info())
+ return
+ else:
+ done.send(True)
+
+ self.done = done
+
+ greenthread.spawn(_inner)
+ return self.done
+
+ def stop(self):
+ self._running = False
+
+ def wait(self):
+ return self.done.wait()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Generic Node base class for all workers that run on hosts."""
+
+import errno
+import os
+import random
+import signal
+import sys
+import time
+
+import eventlet
+import greenlet
+import logging as std_logging
+
+from heat.openstack.common import cfg
+from heat.openstack.common import eventlet_backdoor
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+from heat.openstack.common import threadgroup
+
+try:
+ from heat.openstack.common import rpc
+except ImportError:
+ rpc = None
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class Launcher(object):
+ """Launch one or more services and wait for them to complete."""
+
+ def __init__(self):
+ """Initialize the service launcher.
+
+ :returns: None
+
+ """
+ self._services = []
+ eventlet_backdoor.initialize_if_enabled()
+
+ @staticmethod
+ def run_service(service):
+ """Start and wait for a service to finish.
+
+ :param service: service to run and wait for.
+ :returns: None
+
+ """
+ service.start()
+ service.wait()
+
+ def launch_service(self, service):
+ """Load and start the given service.
+
+ :param service: The service you would like to start.
+ :returns: None
+
+ """
+ gt = eventlet.spawn(self.run_service, service)
+ self._services.append(gt)
+
+ def stop(self):
+ """Stop all services which are currently running.
+
+ :returns: None
+
+ """
+ for service in self._services:
+ service.kill()
+
+ def wait(self):
+ """Waits until all services have been stopped, and then returns.
+
+ :returns: None
+
+ """
+ for service in self._services:
+ try:
+ service.wait()
+ except greenlet.GreenletExit:
+ pass
+
+
+class SignalExit(SystemExit):
+ def __init__(self, signo, exccode=1):
+ super(SignalExit, self).__init__(exccode)
+ self.signo = signo
+
+
+class ServiceLauncher(Launcher):
+ def _handle_signal(self, signo, frame):
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ raise SignalExit(signo)
+
+ def wait(self):
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ status = None
+ try:
+ super(ServiceLauncher, self).wait()
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ finally:
+ self.stop()
+ if rpc:
+ rpc.cleanup()
+ return status
+
+
+class ServiceWrapper(object):
+ def __init__(self, service, workers):
+ self.service = service
+ self.workers = workers
+ self.children = set()
+ self.forktimes = []
+
+
+class ProcessLauncher(object):
+ def __init__(self):
+ self.children = {}
+ self.sigcaught = None
+ self.running = True
+ rfd, self.writepipe = os.pipe()
+ self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signo, frame):
+ self.sigcaught = signo
+ self.running = False
+
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ def _pipe_watcher(self):
+ # This will block until the write end is closed when the parent
+ # dies unexpectedly
+ self.readpipe.read()
+
+ LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+ sys.exit(1)
+
+ def _child_process(self, service):
+ # Setup child signal handlers differently
+ def _sigterm(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ raise SignalExit(signal.SIGTERM)
+
+ signal.signal(signal.SIGTERM, _sigterm)
+ # Block SIGINT and let the parent send us a SIGTERM
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ # Reopen the eventlet hub to make sure we don't share an epoll
+ # fd with parent and/or siblings, which would be bad
+ eventlet.hubs.use_hub()
+
+ # Close write to ensure only parent has it open
+ os.close(self.writepipe)
+ # Create greenthread to watch for parent to close pipe
+ eventlet.spawn(self._pipe_watcher)
+
+ # Reseed random number generator
+ random.seed()
+
+ launcher = Launcher()
+ launcher.run_service(service)
+
+ def _start_child(self, wrap):
+ if len(wrap.forktimes) > wrap.workers:
+ # Limit ourselves to one process a second (over the period of
+ # number of workers * 1 second). This will allow workers to
+ # start up quickly but ensure we don't fork off children that
+ # die instantly too quickly.
+ if time.time() - wrap.forktimes[0] < wrap.workers:
+ LOG.info(_('Forking too fast, sleeping'))
+ time.sleep(1)
+
+ wrap.forktimes.pop(0)
+
+ wrap.forktimes.append(time.time())
+
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ status = 0
+ try:
+ self._child_process(wrap.service)
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+ finally:
+ wrap.service.stop()
+
+ os._exit(status)
+
+ LOG.info(_('Started child %d'), pid)
+
+ wrap.children.add(pid)
+ self.children[pid] = wrap
+
+ return pid
+
+ def launch_service(self, service, workers=1):
+ wrap = ServiceWrapper(service, workers)
+
+ LOG.info(_('Starting %d workers'), wrap.workers)
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ def _wait_child(self):
+ try:
+ pid, status = os.wait()
+ except OSError as exc:
+ if exc.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ return None
+
+ if os.WIFSIGNALED(status):
+ sig = os.WTERMSIG(status)
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
+ else:
+ code = os.WEXITSTATUS(status)
+ LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
+
+ if pid not in self.children:
+ LOG.warning(_('pid %d not in child list'), pid)
+ return None
+
+ wrap = self.children.pop(pid)
+ wrap.children.remove(pid)
+ return wrap
+
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary"""
+ while self.running:
+ wrap = self._wait_child()
+ if not wrap:
+ continue
+
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ if self.sigcaught:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[self.sigcaught]
+ LOG.info(_('Caught %s, stopping children'), signame)
+
+ for pid in self.children:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError as exc:
+ if exc.errno != errno.ESRCH:
+ raise
+
+ # Wait for children to die
+ if self.children:
+ LOG.info(_('Waiting on %d children to exit'), len(self.children))
+ while self.children:
+ self._wait_child()
+
+
+class Service(object):
+ """Service object for binaries running on hosts."""
+
+ def __init__(self):
+ self.tg = threadgroup.ThreadGroup('service')
+
+ def start(self):
+ pass
+
+ def stop(self):
+ self.tg.stop()
+
+ def wait(self):
+ self.tg.wait()
+
+
+def launch(service, workers=None):
+ if workers:
+ launcher = ProcessLauncher()
+ launcher.launch_service(service, workers=workers)
+ else:
+ launcher = ServiceLauncher()
+ launcher.launch_service(service)
+ return launcher
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from eventlet import greenlet
+from eventlet import greenpool
+from eventlet import greenthread
+
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+from heat.openstack.common import loopingcall
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_done(gt, *args, **kwargs):
+ '''
+ Callback function to be passed to GreenThread.link() when we spawn()
+ Calls the ThreadGroup to notify if.
+ '''
+ kwargs['group'].thread_done(kwargs['thread'])
+
+
+class Thread(object):
+ """
+ Wrapper around a greenthread, that holds a reference to
+ the ThreadGroup. The Thread will notify the ThreadGroup
+ when it has done so it can be removed from the threads
+ list.
+ """
+ def __init__(self, name, thread, group):
+ self.name = name
+ self.thread = thread
+ self.thread.link(_thread_done, group=group, thread=self)
+
+ def stop(self):
+ self.thread.cancel()
+
+ def wait(self):
+ return self.thread.wait()
+
+
+class ThreadGroup(object):
+ """
+ The point of this class is to:
+ - keep track of timers and greenthreads (making it easier to stop them
+ when need be).
+ - provide an easy API to add timers.
+ """
+ def __init__(self, name, thread_pool_size=10):
+ self.name = name
+ self.pool = greenpool.GreenPool(thread_pool_size)
+ self.threads = []
+ self.timers = []
+
+ def add_timer(self, interval, callback, initial_delay=None,
+ *args, **kwargs):
+ pulse = loopingcall.LoopingCall(callback, *args, **kwargs)
+ pulse.start(interval=interval,
+ initial_delay=initial_delay)
+ self.timers.append(pulse)
+
+ def add_thread(self, callback, *args, **kwargs):
+ gt = self.pool.spawn(callback, *args, **kwargs)
+ th = Thread(callback.__name__, gt, self)
+ self.threads.append(th)
+
+ def thread_done(self, thread):
+ self.threads.remove(thread)
+
+ def stop(self):
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ # don't kill the current thread.
+ continue
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+ self.timers = []
+
+ def wait(self):
+ for x in self.timers:
+ try:
+ x.wait()
+ except greenlet.GreenletExit:
+ pass
+ except Exception as ex:
+ LOG.exception(ex)
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ continue
+ try:
+ x.wait()
+ except greenlet.GreenletExit:
+ pass
+ except Exception as ex:
+ LOG.exception(ex)