]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Add service.py from openstack-common
authorAngus Salkeld <asalkeld@redhat.com>
Thu, 1 Nov 2012 03:17:15 +0000 (14:17 +1100)
committerAngus Salkeld <asalkeld@redhat.com>
Fri, 2 Nov 2012 01:55:54 +0000 (12:55 +1100)
Change-Id: Ia4996d89d0fdba6dbeb44ae4cf3ca202c78886ee

heat/openstack/common/eventlet_backdoor.py [new file with mode: 0644]
heat/openstack/common/loopingcall.py [new file with mode: 0644]
heat/openstack/common/service.py [new file with mode: 0644]
heat/openstack/common/threadgroup.py [new file with mode: 0644]
openstack-common.conf

diff --git a/heat/openstack/common/eventlet_backdoor.py b/heat/openstack/common/eventlet_backdoor.py
new file mode 100644 (file)
index 0000000..c2e73dd
--- /dev/null
@@ -0,0 +1,78 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Openstack, LLC.
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import gc
+import pprint
+import sys
+import traceback
+
+import eventlet
+import eventlet.backdoor
+import greenlet
+
+from heat.openstack.common import cfg
+
+eventlet_backdoor_opts = [
+    cfg.IntOpt('backdoor_port',
+               default=None,
+               help='port for eventlet backdoor to listen')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(eventlet_backdoor_opts)
+
+
+def _dont_use_this():
+    print "Don't use this, just disconnect instead"
+
+
+def _find_objects(t):
+    return filter(lambda o: isinstance(o, t), gc.get_objects())
+
+
+def _print_greenthreads():
+    for i, gt in enumerate(find_objects(greenlet.greenlet)):
+        print i, gt
+        traceback.print_stack(gt.gr_frame)
+        print
+
+
+def initialize_if_enabled():
+    backdoor_locals = {
+        'exit': _dont_use_this,      # So we don't exit the entire process
+        'quit': _dont_use_this,      # So we don't exit the entire process
+        'fo': _find_objects,
+        'pgt': _print_greenthreads,
+    }
+
+    if CONF.backdoor_port is None:
+        return
+
+    # NOTE(johannes): The standard sys.displayhook will print the value of
+    # the last expression and set it to __builtin__._, which overwrites
+    # the __builtin__._ that gettext sets. Let's switch to using pprint
+    # since it won't interact poorly with gettext, and it's easier to
+    # read the output too.
+    def displayhook(val):
+        if val is not None:
+            pprint.pprint(val)
+    sys.displayhook = displayhook
+
+    eventlet.spawn(eventlet.backdoor.backdoor_server,
+                   eventlet.listen(('localhost', CONF.backdoor_port)),
+                   locals=backdoor_locals)
diff --git a/heat/openstack/common/loopingcall.py b/heat/openstack/common/loopingcall.py
new file mode 100644 (file)
index 0000000..bc7de8c
--- /dev/null
@@ -0,0 +1,88 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import sys
+
+from eventlet import event
+from eventlet import greenthread
+
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class LoopingCallDone(Exception):
+    """Exception to break out and stop a LoopingCall.
+
+    The poll-function passed to LoopingCall can raise this exception to
+    break out of the loop normally. This is somewhat analogous to
+    StopIteration.
+
+    An optional return-value can be included as the argument to the exception;
+    this return-value will be returned by LoopingCall.wait()
+
+    """
+
+    def __init__(self, retvalue=True):
+        """:param retvalue: Value that LoopingCall.wait() should return."""
+        self.retvalue = retvalue
+
+
+class LoopingCall(object):
+    def __init__(self, f=None, *args, **kw):
+        self.args = args
+        self.kw = kw
+        self.f = f
+        self._running = False
+
+    def start(self, interval, initial_delay=None):
+        self._running = True
+        done = event.Event()
+
+        def _inner():
+            if initial_delay:
+                greenthread.sleep(initial_delay)
+
+            try:
+                while self._running:
+                    self.f(*self.args, **self.kw)
+                    if not self._running:
+                        break
+                    greenthread.sleep(interval)
+            except LoopingCallDone, e:
+                self.stop()
+                done.send(e.retvalue)
+            except Exception:
+                LOG.exception(_('in looping call'))
+                done.send_exception(*sys.exc_info())
+                return
+            else:
+                done.send(True)
+
+        self.done = done
+
+        greenthread.spawn(_inner)
+        return self.done
+
+    def stop(self):
+        self._running = False
+
+    def wait(self):
+        return self.done.wait()
diff --git a/heat/openstack/common/service.py b/heat/openstack/common/service.py
new file mode 100644 (file)
index 0000000..913a4c7
--- /dev/null
@@ -0,0 +1,328 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Generic Node base class for all workers that run on hosts."""
+
+import errno
+import os
+import random
+import signal
+import sys
+import time
+
+import eventlet
+import greenlet
+import logging as std_logging
+
+from heat.openstack.common import cfg
+from heat.openstack.common import eventlet_backdoor
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+from heat.openstack.common import threadgroup
+
+try:
+    from heat.openstack.common import rpc
+except ImportError:
+    rpc = None
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class Launcher(object):
+    """Launch one or more services and wait for them to complete."""
+
+    def __init__(self):
+        """Initialize the service launcher.
+
+        :returns: None
+
+        """
+        self._services = []
+        eventlet_backdoor.initialize_if_enabled()
+
+    @staticmethod
+    def run_service(service):
+        """Start and wait for a service to finish.
+
+        :param service: service to run and wait for.
+        :returns: None
+
+        """
+        service.start()
+        service.wait()
+
+    def launch_service(self, service):
+        """Load and start the given service.
+
+        :param service: The service you would like to start.
+        :returns: None
+
+        """
+        gt = eventlet.spawn(self.run_service, service)
+        self._services.append(gt)
+
+    def stop(self):
+        """Stop all services which are currently running.
+
+        :returns: None
+
+        """
+        for service in self._services:
+            service.kill()
+
+    def wait(self):
+        """Waits until all services have been stopped, and then returns.
+
+        :returns: None
+
+        """
+        for service in self._services:
+            try:
+                service.wait()
+            except greenlet.GreenletExit:
+                pass
+
+
+class SignalExit(SystemExit):
+    def __init__(self, signo, exccode=1):
+        super(SignalExit, self).__init__(exccode)
+        self.signo = signo
+
+
+class ServiceLauncher(Launcher):
+    def _handle_signal(self, signo, frame):
+        # Allow the process to be killed again and die from natural causes
+        signal.signal(signal.SIGTERM, signal.SIG_DFL)
+        signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+        raise SignalExit(signo)
+
+    def wait(self):
+        signal.signal(signal.SIGTERM, self._handle_signal)
+        signal.signal(signal.SIGINT, self._handle_signal)
+
+        LOG.debug(_('Full set of CONF:'))
+        CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+        status = None
+        try:
+            super(ServiceLauncher, self).wait()
+        except SignalExit as exc:
+            signame = {signal.SIGTERM: 'SIGTERM',
+                       signal.SIGINT: 'SIGINT'}[exc.signo]
+            LOG.info(_('Caught %s, exiting'), signame)
+            status = exc.code
+        except SystemExit as exc:
+            status = exc.code
+        finally:
+            self.stop()
+            if rpc:
+                rpc.cleanup()
+        return status
+
+
+class ServiceWrapper(object):
+    def __init__(self, service, workers):
+        self.service = service
+        self.workers = workers
+        self.children = set()
+        self.forktimes = []
+
+
+class ProcessLauncher(object):
+    def __init__(self):
+        self.children = {}
+        self.sigcaught = None
+        self.running = True
+        rfd, self.writepipe = os.pipe()
+        self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+        signal.signal(signal.SIGTERM, self._handle_signal)
+        signal.signal(signal.SIGINT, self._handle_signal)
+
+    def _handle_signal(self, signo, frame):
+        self.sigcaught = signo
+        self.running = False
+
+        # Allow the process to be killed again and die from natural causes
+        signal.signal(signal.SIGTERM, signal.SIG_DFL)
+        signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+    def _pipe_watcher(self):
+        # This will block until the write end is closed when the parent
+        # dies unexpectedly
+        self.readpipe.read()
+
+        LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+        sys.exit(1)
+
+    def _child_process(self, service):
+        # Setup child signal handlers differently
+        def _sigterm(*args):
+            signal.signal(signal.SIGTERM, signal.SIG_DFL)
+            raise SignalExit(signal.SIGTERM)
+
+        signal.signal(signal.SIGTERM, _sigterm)
+        # Block SIGINT and let the parent send us a SIGTERM
+        signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+        # Reopen the eventlet hub to make sure we don't share an epoll
+        # fd with parent and/or siblings, which would be bad
+        eventlet.hubs.use_hub()
+
+        # Close write to ensure only parent has it open
+        os.close(self.writepipe)
+        # Create greenthread to watch for parent to close pipe
+        eventlet.spawn(self._pipe_watcher)
+
+        # Reseed random number generator
+        random.seed()
+
+        launcher = Launcher()
+        launcher.run_service(service)
+
+    def _start_child(self, wrap):
+        if len(wrap.forktimes) > wrap.workers:
+            # Limit ourselves to one process a second (over the period of
+            # number of workers * 1 second). This will allow workers to
+            # start up quickly but ensure we don't fork off children that
+            # die instantly too quickly.
+            if time.time() - wrap.forktimes[0] < wrap.workers:
+                LOG.info(_('Forking too fast, sleeping'))
+                time.sleep(1)
+
+            wrap.forktimes.pop(0)
+
+        wrap.forktimes.append(time.time())
+
+        pid = os.fork()
+        if pid == 0:
+            # NOTE(johannes): All exceptions are caught to ensure this
+            # doesn't fallback into the loop spawning children. It would
+            # be bad for a child to spawn more children.
+            status = 0
+            try:
+                self._child_process(wrap.service)
+            except SignalExit as exc:
+                signame = {signal.SIGTERM: 'SIGTERM',
+                           signal.SIGINT: 'SIGINT'}[exc.signo]
+                LOG.info(_('Caught %s, exiting'), signame)
+                status = exc.code
+            except SystemExit as exc:
+                status = exc.code
+            except BaseException:
+                LOG.exception(_('Unhandled exception'))
+                status = 2
+            finally:
+                wrap.service.stop()
+
+            os._exit(status)
+
+        LOG.info(_('Started child %d'), pid)
+
+        wrap.children.add(pid)
+        self.children[pid] = wrap
+
+        return pid
+
+    def launch_service(self, service, workers=1):
+        wrap = ServiceWrapper(service, workers)
+
+        LOG.info(_('Starting %d workers'), wrap.workers)
+        while self.running and len(wrap.children) < wrap.workers:
+            self._start_child(wrap)
+
+    def _wait_child(self):
+        try:
+            pid, status = os.wait()
+        except OSError as exc:
+            if exc.errno not in (errno.EINTR, errno.ECHILD):
+                raise
+            return None
+
+        if os.WIFSIGNALED(status):
+            sig = os.WTERMSIG(status)
+            LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
+        else:
+            code = os.WEXITSTATUS(status)
+            LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
+
+        if pid not in self.children:
+            LOG.warning(_('pid %d not in child list'), pid)
+            return None
+
+        wrap = self.children.pop(pid)
+        wrap.children.remove(pid)
+        return wrap
+
+    def wait(self):
+        """Loop waiting on children to die and respawning as necessary"""
+        while self.running:
+            wrap = self._wait_child()
+            if not wrap:
+                continue
+
+            while self.running and len(wrap.children) < wrap.workers:
+                self._start_child(wrap)
+
+        if self.sigcaught:
+            signame = {signal.SIGTERM: 'SIGTERM',
+                       signal.SIGINT: 'SIGINT'}[self.sigcaught]
+            LOG.info(_('Caught %s, stopping children'), signame)
+
+        for pid in self.children:
+            try:
+                os.kill(pid, signal.SIGTERM)
+            except OSError as exc:
+                if exc.errno != errno.ESRCH:
+                    raise
+
+        # Wait for children to die
+        if self.children:
+            LOG.info(_('Waiting on %d children to exit'), len(self.children))
+            while self.children:
+                self._wait_child()
+
+
+class Service(object):
+    """Service object for binaries running on hosts."""
+
+    def __init__(self):
+        self.tg = threadgroup.ThreadGroup('service')
+
+    def start(self):
+        pass
+
+    def stop(self):
+        self.tg.stop()
+
+    def wait(self):
+        self.tg.wait()
+
+
+def launch(service, workers=None):
+    if workers:
+        launcher = ProcessLauncher()
+        launcher.launch_service(service, workers=workers)
+    else:
+        launcher = ServiceLauncher()
+        launcher.launch_service(service)
+    return launcher
diff --git a/heat/openstack/common/threadgroup.py b/heat/openstack/common/threadgroup.py
new file mode 100644 (file)
index 0000000..feee2a0
--- /dev/null
@@ -0,0 +1,119 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from eventlet import greenlet
+from eventlet import greenpool
+from eventlet import greenthread
+
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+from heat.openstack.common import loopingcall
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_done(gt, *args, **kwargs):
+    '''
+    Callback function to be passed to GreenThread.link() when we spawn()
+    Calls the ThreadGroup to notify if.
+    '''
+    kwargs['group'].thread_done(kwargs['thread'])
+
+
+class Thread(object):
+    """
+    Wrapper around a greenthread, that holds a reference to
+    the ThreadGroup. The Thread will notify the ThreadGroup
+    when it has done so it can be removed from the threads
+    list.
+    """
+    def __init__(self, name, thread, group):
+        self.name = name
+        self.thread = thread
+        self.thread.link(_thread_done, group=group, thread=self)
+
+    def stop(self):
+        self.thread.cancel()
+
+    def wait(self):
+        return self.thread.wait()
+
+
+class ThreadGroup(object):
+    """
+    The point of this class is to:
+    - keep track of timers and greenthreads (making it easier to stop them
+      when need be).
+    - provide an easy API to add timers.
+    """
+    def __init__(self, name, thread_pool_size=10):
+        self.name = name
+        self.pool = greenpool.GreenPool(thread_pool_size)
+        self.threads = []
+        self.timers = []
+
+    def add_timer(self, interval, callback, initial_delay=None,
+                  *args, **kwargs):
+        pulse = loopingcall.LoopingCall(callback, *args, **kwargs)
+        pulse.start(interval=interval,
+                    initial_delay=initial_delay)
+        self.timers.append(pulse)
+
+    def add_thread(self, callback, *args, **kwargs):
+        gt = self.pool.spawn(callback, *args, **kwargs)
+        th = Thread(callback.__name__, gt, self)
+        self.threads.append(th)
+
+    def thread_done(self, thread):
+        self.threads.remove(thread)
+
+    def stop(self):
+        current = greenthread.getcurrent()
+        for x in self.threads:
+            if x is current:
+                # don't kill the current thread.
+                continue
+            try:
+                x.stop()
+            except Exception as ex:
+                LOG.exception(ex)
+
+        for x in self.timers:
+            try:
+                x.stop()
+            except Exception as ex:
+                LOG.exception(ex)
+        self.timers = []
+
+    def wait(self):
+        for x in self.timers:
+            try:
+                x.wait()
+            except greenlet.GreenletExit:
+                pass
+            except Exception as ex:
+                LOG.exception(ex)
+        current = greenthread.getcurrent()
+        for x in self.threads:
+            if x is current:
+                continue
+            try:
+                x.wait()
+            except greenlet.GreenletExit:
+                pass
+            except Exception as ex:
+                LOG.exception(ex)
index a87665559738c915e6dd8c43aff6622e264e9059..a940e54afdaee3ec104aa3d36cec44a7b6a38c19 100644 (file)
@@ -1,7 +1,7 @@
 [DEFAULT]
 
 # The list of modules to copy from openstack-common
-modules=gettextutils,cfg,local,iniparser,utils,exception,timeutils,importutils,setup,log,jsonutils,notifier,rpc,excutils
+modules=gettextutils,cfg,local,iniparser,utils,exception,timeutils,importutils,setup,log,jsonutils,notifier,rpc,excutils,service,threadgroup,eventlet_backdoor,loopingcall
 
 # The base module to hold the copy of openstack.common
 base=heat