]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Update to latest copy of OSLO incubator
authorMichael Kerrin <michael.kerrin@hp.com>
Wed, 17 Apr 2013 16:41:50 +0000 (16:41 +0000)
committerMichael Kerrin <michael.kerrin@hp.com>
Wed, 17 Apr 2013 17:17:05 +0000 (17:17 +0000)
Specifically I wanted the change be84db3ab24ef94b6ec457bb299d48c51575e8a6
to oslo-incubator to fix my logging issue.

I ignored the changes in common/policy.py as this change breaks
the cinder unit tests.

Fixes bug: 1170038

Change-Id: Id72417d58c8f4bf139aa082131154153a175689d

18 files changed:
cinder/openstack/common/eventlet_backdoor.py [new file with mode: 0644]
cinder/openstack/common/gettextutils.py
cinder/openstack/common/jsonutils.py
cinder/openstack/common/log.py
cinder/openstack/common/loopingcall.py [new file with mode: 0644]
cinder/openstack/common/notifier/api.py
cinder/openstack/common/rootwrap/cmd.py [new file with mode: 0755]
cinder/openstack/common/rootwrap/filters.py
cinder/openstack/common/rpc/amqp.py
cinder/openstack/common/rpc/common.py
cinder/openstack/common/rpc/dispatcher.py
cinder/openstack/common/rpc/impl_fake.py
cinder/openstack/common/rpc/impl_qpid.py
cinder/openstack/common/rpc/impl_zmq.py
cinder/openstack/common/rpc/proxy.py
cinder/openstack/common/rpc/zmq_receiver.py [new file with mode: 0755]
cinder/openstack/common/service.py [new file with mode: 0644]
cinder/openstack/common/threadgroup.py [new file with mode: 0644]

diff --git a/cinder/openstack/common/eventlet_backdoor.py b/cinder/openstack/common/eventlet_backdoor.py
new file mode 100644 (file)
index 0000000..c0ad460
--- /dev/null
@@ -0,0 +1,87 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 OpenStack Foundation.
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import gc
+import pprint
+import sys
+import traceback
+
+import eventlet
+import eventlet.backdoor
+import greenlet
+from oslo.config import cfg
+
+eventlet_backdoor_opts = [
+    cfg.IntOpt('backdoor_port',
+               default=None,
+               help='port for eventlet backdoor to listen')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(eventlet_backdoor_opts)
+
+
+def _dont_use_this():
+    print "Don't use this, just disconnect instead"
+
+
+def _find_objects(t):
+    return filter(lambda o: isinstance(o, t), gc.get_objects())
+
+
+def _print_greenthreads():
+    for i, gt in enumerate(_find_objects(greenlet.greenlet)):
+        print i, gt
+        traceback.print_stack(gt.gr_frame)
+        print
+
+
+def _print_nativethreads():
+    for threadId, stack in sys._current_frames().items():
+        print threadId
+        traceback.print_stack(stack)
+        print
+
+
+def initialize_if_enabled():
+    backdoor_locals = {
+        'exit': _dont_use_this,      # So we don't exit the entire process
+        'quit': _dont_use_this,      # So we don't exit the entire process
+        'fo': _find_objects,
+        'pgt': _print_greenthreads,
+        'pnt': _print_nativethreads,
+    }
+
+    if CONF.backdoor_port is None:
+        return None
+
+    # NOTE(johannes): The standard sys.displayhook will print the value of
+    # the last expression and set it to __builtin__._, which overwrites
+    # the __builtin__._ that gettext sets. Let's switch to using pprint
+    # since it won't interact poorly with gettext, and it's easier to
+    # read the output too.
+    def displayhook(val):
+        if val is not None:
+            pprint.pprint(val)
+    sys.displayhook = displayhook
+
+    sock = eventlet.listen(('localhost', CONF.backdoor_port))
+    port = sock.getsockname()[1]
+    eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
+                     locals=backdoor_locals)
+    return port
index 87e3520e6cc4ae4a64b02cf5c8ab7965555e3dd0..5c0540b43d49e054059d81dee03dd7c9dbb4cbf4 100644 (file)
@@ -24,10 +24,27 @@ Usual usage in an openstack.common module:
 """
 
 import gettext
+import os
 
-
-t = gettext.translation('openstack-common', 'locale', fallback=True)
+_localedir = os.environ.get('cinder'.upper() + '_LOCALEDIR')
+_t = gettext.translation('cinder', localedir=_localedir, fallback=True)
 
 
 def _(msg):
-    return t.ugettext(msg)
+    return _t.ugettext(msg)
+
+
+def install(domain):
+    """Install a _() function using the given translation domain.
+
+    Given a translation domain, install a _() function using gettext's
+    install() function.
+
+    The main difference from gettext.install() is that we allow
+    overriding the default localedir (e.g. /usr/share/locale) using
+    a translation-domain-specific environment variable (e.g.
+    NOVA_LOCALEDIR).
+    """
+    gettext.install(domain,
+                    localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
+                    unicode=True)
index 1e9262ed0df845483c48eef11870863019733040..70134d4192cc76ebdb67b01fe7425536bf341d6b 100644 (file)
@@ -38,11 +38,21 @@ import functools
 import inspect
 import itertools
 import json
+import types
 import xmlrpclib
 
 from cinder.openstack.common import timeutils
 
 
+_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
+                     inspect.isfunction, inspect.isgeneratorfunction,
+                     inspect.isgenerator, inspect.istraceback, inspect.isframe,
+                     inspect.iscode, inspect.isbuiltin, inspect.isroutine,
+                     inspect.isabstract]
+
+_simple_types = (types.NoneType, int, basestring, bool, float, long)
+
+
 def to_primitive(value, convert_instances=False, convert_datetime=True,
                  level=0, max_depth=3):
     """Convert a complex object into primitives.
@@ -58,17 +68,30 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
     Therefore, convert_instances=True is lossy ... be aware.
 
     """
-    nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
-             inspect.isfunction, inspect.isgeneratorfunction,
-             inspect.isgenerator, inspect.istraceback, inspect.isframe,
-             inspect.iscode, inspect.isbuiltin, inspect.isroutine,
-             inspect.isabstract]
-    for test in nasty:
-        if test(value):
-            return unicode(value)
-
-    # value of itertools.count doesn't get caught by inspects
-    # above and results in infinite loop when list(value) is called.
+    # handle obvious types first - order of basic types determined by running
+    # full tests on nova project, resulting in the following counts:
+    # 572754 <type 'NoneType'>
+    # 460353 <type 'int'>
+    # 379632 <type 'unicode'>
+    # 274610 <type 'str'>
+    # 199918 <type 'dict'>
+    # 114200 <type 'datetime.datetime'>
+    #  51817 <type 'bool'>
+    #  26164 <type 'list'>
+    #   6491 <type 'float'>
+    #    283 <type 'tuple'>
+    #     19 <type 'long'>
+    if isinstance(value, _simple_types):
+        return value
+
+    if isinstance(value, datetime.datetime):
+        if convert_datetime:
+            return timeutils.strtime(value)
+        else:
+            return value
+
+    # value of itertools.count doesn't get caught by nasty_type_tests
+    # and results in infinite loop when list(value) is called.
     if type(value) == itertools.count:
         return unicode(value)
 
@@ -91,17 +114,18 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
                                       convert_datetime=convert_datetime,
                                       level=level,
                                       max_depth=max_depth)
+        if isinstance(value, dict):
+            return dict((k, recursive(v)) for k, v in value.iteritems())
+        elif isinstance(value, (list, tuple)):
+            return [recursive(lv) for lv in value]
+
         # It's not clear why xmlrpclib created their own DateTime type, but
         # for our purposes, make it a datetime type which is explicitly
         # handled
         if isinstance(value, xmlrpclib.DateTime):
             value = datetime.datetime(*tuple(value.timetuple())[:6])
 
-        if isinstance(value, (list, tuple)):
-            return [recursive(v) for v in value]
-        elif isinstance(value, dict):
-            return dict((k, recursive(v)) for k, v in value.iteritems())
-        elif convert_datetime and isinstance(value, datetime.datetime):
+        if convert_datetime and isinstance(value, datetime.datetime):
             return timeutils.strtime(value)
         elif hasattr(value, 'iteritems'):
             return recursive(dict(value.iteritems()), level=level + 1)
@@ -112,6 +136,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
             # Ignore class member vars.
             return recursive(value.__dict__, level=level + 1)
         else:
+            if any(test(value) for test in _nasty_type_tests):
+                return unicode(value)
             return value
     except TypeError:
         # Class objects are tricky since they may define something like
index 9f321b94675ae6141d3fcd0f4c57790c828c56cf..c170971f958cb2cff946a3367e48054112cef71d 100644 (file)
@@ -29,6 +29,7 @@ It also allows setting of formatting information through conf.
 
 """
 
+import ConfigParser
 import cStringIO
 import inspect
 import itertools
@@ -87,11 +88,11 @@ logging_cli_opts = [
                metavar='PATH',
                deprecated_name='logfile',
                help='(Optional) Name of log file to output to. '
-                    'If not set, logging will go to stdout.'),
+                    'If no default is set, logging will go to stdout.'),
     cfg.StrOpt('log-dir',
                deprecated_name='logdir',
-               help='(Optional) The directory to keep log files in '
-                    '(will be prepended to --log-file)'),
+               help='(Optional) The base directory used for relative '
+                    '--log-file paths'),
     cfg.BoolOpt('use-syslog',
                 default=False,
                 help='Use syslog for logging.'),
@@ -111,9 +112,9 @@ generic_log_opts = [
 
 log_opts = [
     cfg.StrOpt('logging_context_format_string',
-               default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
-                       '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
-                       '%(message)s',
+               default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+                       '%(name)s [%(request_id)s %(user)s %(tenant)s] '
+                       '%(instance)s%(message)s',
                help='format string to use for log messages with context'),
     cfg.StrOpt('logging_default_format_string',
                default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
@@ -323,10 +324,30 @@ def _create_logging_excepthook(product_name):
     return logging_excepthook
 
 
+class LogConfigError(Exception):
+
+    message = _('Error loading logging config %(log_config)s: %(err_msg)s')
+
+    def __init__(self, log_config, err_msg):
+        self.log_config = log_config
+        self.err_msg = err_msg
+
+    def __str__(self):
+        return self.message % dict(log_config=self.log_config,
+                                   err_msg=self.err_msg)
+
+
+def _load_log_config(log_config):
+    try:
+        logging.config.fileConfig(log_config)
+    except ConfigParser.Error, exc:
+        raise LogConfigError(log_config, str(exc))
+
+
 def setup(product_name):
     """Setup logging."""
     if CONF.log_config:
-        logging.config.fileConfig(CONF.log_config)
+        _load_log_config(CONF.log_config)
     else:
         _setup_logging_from_conf()
     sys.excepthook = _create_logging_excepthook(product_name)
@@ -411,14 +432,11 @@ def _setup_logging_from_conf():
     else:
         log_root.setLevel(logging.WARNING)
 
-    level = logging.NOTSET
     for pair in CONF.default_log_levels:
         mod, _sep, level_name = pair.partition('=')
         level = logging.getLevelName(level_name)
         logger = logging.getLogger(mod)
         logger.setLevel(level)
-        for handler in log_root.handlers:
-            logger.addHandler(handler)
 
 _loggers = {}
 
diff --git a/cinder/openstack/common/loopingcall.py b/cinder/openstack/common/loopingcall.py
new file mode 100644 (file)
index 0000000..8be3a00
--- /dev/null
@@ -0,0 +1,147 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import sys
+
+from eventlet import event
+from eventlet import greenthread
+
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import timeutils
+
+LOG = logging.getLogger(__name__)
+
+
+class LoopingCallDone(Exception):
+    """Exception to break out and stop a LoopingCall.
+
+    The poll-function passed to LoopingCall can raise this exception to
+    break out of the loop normally. This is somewhat analogous to
+    StopIteration.
+
+    An optional return-value can be included as the argument to the exception;
+    this return-value will be returned by LoopingCall.wait()
+
+    """
+
+    def __init__(self, retvalue=True):
+        """:param retvalue: Value that LoopingCall.wait() should return."""
+        self.retvalue = retvalue
+
+
+class LoopingCallBase(object):
+    def __init__(self, f=None, *args, **kw):
+        self.args = args
+        self.kw = kw
+        self.f = f
+        self._running = False
+        self.done = None
+
+    def stop(self):
+        self._running = False
+
+    def wait(self):
+        return self.done.wait()
+
+
+class FixedIntervalLoopingCall(LoopingCallBase):
+    """A fixed interval looping call."""
+
+    def start(self, interval, initial_delay=None):
+        self._running = True
+        done = event.Event()
+
+        def _inner():
+            if initial_delay:
+                greenthread.sleep(initial_delay)
+
+            try:
+                while self._running:
+                    start = timeutils.utcnow()
+                    self.f(*self.args, **self.kw)
+                    end = timeutils.utcnow()
+                    if not self._running:
+                        break
+                    delay = interval - timeutils.delta_seconds(start, end)
+                    if delay <= 0:
+                        LOG.warn(_('task run outlasted interval by %s sec') %
+                                 -delay)
+                    greenthread.sleep(delay if delay > 0 else 0)
+            except LoopingCallDone, e:
+                self.stop()
+                done.send(e.retvalue)
+            except Exception:
+                LOG.exception(_('in fixed duration looping call'))
+                done.send_exception(*sys.exc_info())
+                return
+            else:
+                done.send(True)
+
+        self.done = done
+
+        greenthread.spawn_n(_inner)
+        return self.done
+
+
+# TODO(mikal): this class name is deprecated in Havana and should be removed
+# in the I release
+LoopingCall = FixedIntervalLoopingCall
+
+
+class DynamicLoopingCall(LoopingCallBase):
+    """A looping call which sleeps until the next known event.
+
+    The function called should return how long to sleep for before being
+    called again.
+    """
+
+    def start(self, initial_delay=None, periodic_interval_max=None):
+        self._running = True
+        done = event.Event()
+
+        def _inner():
+            if initial_delay:
+                greenthread.sleep(initial_delay)
+
+            try:
+                while self._running:
+                    idle = self.f(*self.args, **self.kw)
+                    if not self._running:
+                        break
+
+                    if periodic_interval_max is not None:
+                        idle = min(idle, periodic_interval_max)
+                    LOG.debug(_('Dynamic looping call sleeping for %.02f '
+                                'seconds'), idle)
+                    greenthread.sleep(idle)
+            except LoopingCallDone, e:
+                self.stop()
+                done.send(e.retvalue)
+            except Exception:
+                LOG.exception(_('in dynamic looping call'))
+                done.send_exception(*sys.exc_info())
+                return
+            else:
+                done.send(True)
+
+        self.done = done
+
+        greenthread.spawn(_inner)
+        return self.done
index 38b2daba738c3c5f9c12cd3ecb0a3c21bb5eeaf7..6b82e4451e0e1e01b137ca32384f94997a2a59f6 100644 (file)
@@ -30,7 +30,6 @@ LOG = logging.getLogger(__name__)
 notifier_opts = [
     cfg.MultiStrOpt('notification_driver',
                     default=[],
-                    deprecated_name='list_notifier_drivers',
                     help='Driver or drivers to handle sending notifications'),
     cfg.StrOpt('default_notification_level',
                default='INFO',
diff --git a/cinder/openstack/common/rootwrap/cmd.py b/cinder/openstack/common/rootwrap/cmd.py
new file mode 100755 (executable)
index 0000000..78265e3
--- /dev/null
@@ -0,0 +1,128 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Root wrapper for OpenStack services
+
+   Filters which commands a service is allowed to run as another user.
+
+   To use this with cinder, you should set the following in
+   cinder.conf:
+   rootwrap_config=/etc/cinder/rootwrap.conf
+
+   You also need to let the cinder user run cinder-rootwrap
+   as root in sudoers:
+   cinder ALL = (root) NOPASSWD: /usr/bin/cinder-rootwrap
+                                   /etc/cinder/rootwrap.conf *
+
+   Service packaging should deploy .filters files only on nodes where
+   they are needed, to avoid allowing more than is necessary.
+"""
+
+import ConfigParser
+import logging
+import os
+import pwd
+import signal
+import subprocess
+import sys
+
+
+RC_UNAUTHORIZED = 99
+RC_NOCOMMAND = 98
+RC_BADCONFIG = 97
+RC_NOEXECFOUND = 96
+
+
+def _subprocess_setup():
+    # Python installs a SIGPIPE handler by default. This is usually not what
+    # non-Python subprocesses expect.
+    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+
+def _exit_error(execname, message, errorcode, log=True):
+    print "%s: %s" % (execname, message)
+    if log:
+        logging.error(message)
+    sys.exit(errorcode)
+
+
+def main():
+    # Split arguments, require at least a command
+    execname = sys.argv.pop(0)
+    if len(sys.argv) < 2:
+        _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
+
+    configfile = sys.argv.pop(0)
+    userargs = sys.argv[:]
+
+    # Add ../ to sys.path to allow running from branch
+    possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
+                                                    os.pardir, os.pardir))
+    if os.path.exists(os.path.join(possible_topdir, "cinder", "__init__.py")):
+        sys.path.insert(0, possible_topdir)
+
+    from cinder.openstack.common.rootwrap import wrapper
+
+    # Load configuration
+    try:
+        rawconfig = ConfigParser.RawConfigParser()
+        rawconfig.read(configfile)
+        config = wrapper.RootwrapConfig(rawconfig)
+    except ValueError as exc:
+        msg = "Incorrect value in %s: %s" % (configfile, exc.message)
+        _exit_error(execname, msg, RC_BADCONFIG, log=False)
+    except ConfigParser.Error:
+        _exit_error(execname, "Incorrect configuration file: %s" % configfile,
+                    RC_BADCONFIG, log=False)
+
+    if config.use_syslog:
+        wrapper.setup_syslog(execname,
+                             config.syslog_log_facility,
+                             config.syslog_log_level)
+
+    # Execute command if it matches any of the loaded filters
+    filters = wrapper.load_filters(config.filters_path)
+    try:
+        filtermatch = wrapper.match_filter(filters, userargs,
+                                           exec_dirs=config.exec_dirs)
+        if filtermatch:
+            command = filtermatch.get_command(userargs,
+                                              exec_dirs=config.exec_dirs)
+            if config.use_syslog:
+                logging.info("(%s > %s) Executing %s (filter match = %s)" % (
+                    os.getlogin(), pwd.getpwuid(os.getuid())[0],
+                    command, filtermatch.name))
+
+            obj = subprocess.Popen(command,
+                                   stdin=sys.stdin,
+                                   stdout=sys.stdout,
+                                   stderr=sys.stderr,
+                                   preexec_fn=_subprocess_setup,
+                                   env=filtermatch.get_environment(userargs))
+            obj.wait()
+            sys.exit(obj.returncode)
+
+    except wrapper.FilterMatchNotExecutable as exc:
+        msg = ("Executable not found: %s (filter match = %s)"
+               % (exc.match.exec_path, exc.match.name))
+        _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
+
+    except wrapper.NoFilterMatched:
+        msg = ("Unauthorized command: %s (no filter matched)"
+               % ' '.join(userargs))
+        _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
index eadda256ca8a5ce04da71c3304116e54bbaa6606..d9618af8834089f7e3e99561b6b421cdc8a0e7c7 100644 (file)
@@ -88,6 +88,52 @@ class RegExpFilter(CommandFilter):
         return False
 
 
+class PathFilter(CommandFilter):
+    """Command filter checking that path arguments are within given dirs
+
+        One can specify the following constraints for command arguments:
+            1) pass     - pass an argument as is to the resulting command
+            2) some_str - check if an argument is equal to the given string
+            3) abs path - check if a path argument is within the given base dir
+
+        A typical rootwrapper filter entry looks like this:
+            # cmdname: filter name, raw command, user, arg_i_constraint [, ...]
+            chown: PathFilter, /bin/chown, root, nova, /var/lib/images
+
+    """
+
+    def match(self, userargs):
+        command, arguments = userargs[0], userargs[1:]
+
+        equal_args_num = len(self.args) == len(arguments)
+        exec_is_valid = super(PathFilter, self).match(userargs)
+        args_equal_or_pass = all(
+            arg == 'pass' or arg == value
+            for arg, value in zip(self.args, arguments)
+            if not os.path.isabs(arg)  # arguments not specifying abs paths
+        )
+        paths_are_within_base_dirs = all(
+            os.path.commonprefix([arg, os.path.realpath(value)]) == arg
+            for arg, value in zip(self.args, arguments)
+            if os.path.isabs(arg)  # arguments specifying abs paths
+        )
+
+        return (equal_args_num and
+                exec_is_valid and
+                args_equal_or_pass and
+                paths_are_within_base_dirs)
+
+    def get_command(self, userargs, exec_dirs=[]):
+        command, arguments = userargs[0], userargs[1:]
+
+        # convert path values to canonical ones; copy other args as is
+        args = [os.path.realpath(value) if os.path.isabs(arg) else value
+                for arg, value in zip(self.args, arguments)]
+
+        return super(PathFilter, self).get_command([command] + args,
+                                                   exec_dirs)
+
+
 class DnsmasqFilter(CommandFilter):
     """Specific filter for the dnsmasq call (which includes env)"""
 
index 832511cd73f289f519e10482fe8720812977f9a9..9addfa1c76391489b56771a14fe2c66d3df3e60e 100644 (file)
@@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait):
         ctxt = unpack_context(self.conf, message_data)
         method = message_data.get('method')
         args = message_data.get('args', {})
-        version = message_data.get('version', None)
+        version = message_data.get('version')
+        namespace = message_data.get('namespace')
         if not method:
             LOG.warn(_('no method for message: %s') % message_data)
             ctxt.reply(_('No method for message: %s') % message_data,
                        connection_pool=self.connection_pool)
             return
-        self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+        self.pool.spawn_n(self._process_data, ctxt, version, method,
+                          namespace, args)
 
-    def _process_data(self, ctxt, version, method, args):
+    def _process_data(self, ctxt, version, method, namespace, args):
         """Process a message in a new thread.
 
         If the proxy object we have has a dispatch method
@@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait):
         """
         ctxt.update_store()
         try:
-            rval = self.proxy.dispatch(ctxt, version, method, **args)
+            rval = self.proxy.dispatch(ctxt, version, method, namespace,
+                                       **args)
             # Check if the result was a generator
             if inspect.isgenerator(rval):
                 for x in rval:
@@ -495,7 +498,6 @@ class MulticallProxyWaiter(object):
                 data = self._dataqueue.get(timeout=self._timeout)
                 result = self._process_data(data)
             except queue.Empty:
-                LOG.exception(_('Timed out waiting for RPC response.'))
                 self.done()
                 raise rpc_common.Timeout()
             except Exception:
@@ -662,7 +664,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
         if envelope:
-            msg = rpc_common.serialize_msg(msg, force_envelope=True)
+            msg = rpc_common.serialize_msg(msg)
         conn.notify_send(topic, msg)
 
 
index b2d1e91ff9db372980faeb0515425e3bfcda0949..9f0552e5e9060d3524d6a7aac357f058c6dc244c 100644 (file)
@@ -70,10 +70,6 @@ _VERSION_KEY = 'oslo.version'
 _MESSAGE_KEY = 'oslo.message'
 
 
-# TODO(russellb) Turn this on after Grizzly.
-_SEND_RPC_ENVELOPE = False
-
-
 class RPCException(Exception):
     message = _("An unknown RPC related exception occurred.")
 
@@ -122,7 +118,25 @@ class Timeout(RPCException):
     This exception is raised if the rpc_response_timeout is reached while
     waiting for a response from the remote side.
     """
-    message = _("Timeout while waiting on RPC response.")
+    message = _('Timeout while waiting on RPC response - '
+                'topic: "%(topic)s", RPC method: "%(method)s" '
+                'info: "%(info)s"')
+
+    def __init__(self, info=None, topic=None, method=None):
+        """
+        :param info: Extra info to convey to the user
+        :param topic: The topic that the rpc call was sent to
+        :param rpc_method_name: The name of the rpc method being
+                                called
+        """
+        self.info = info
+        self.topic = topic
+        self.method = method
+        super(Timeout, self).__init__(
+            None,
+            info=info or _('<unknown>'),
+            topic=topic or _('<unknown>'),
+            method=method or _('<unknown>'))
 
 
 class DuplicateMessageError(RPCException):
@@ -325,7 +339,7 @@ def deserialize_remote_exception(conf, data):
         if not issubclass(klass, Exception):
             raise TypeError("Can only deserialize Exceptions")
 
-        failure = klass(**failure.get('kwargs', {}))
+        failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
     except (AttributeError, TypeError, ImportError):
         return RemoteError(name, failure.get('message'), trace)
 
@@ -441,10 +455,7 @@ def version_is_compatible(imp_version, version):
     return True
 
 
-def serialize_msg(raw_msg, force_envelope=False):
-    if not _SEND_RPC_ENVELOPE and not force_envelope:
-        return raw_msg
-
+def serialize_msg(raw_msg):
     # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
     # information about this format.
     msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
index 7734a7fb7e0c2b02fd470876f1477046f7ddae73..85195d4a74436df185040378abed672fbf7f90d6 100644 (file)
@@ -103,13 +103,16 @@ class RpcDispatcher(object):
         self.callbacks = callbacks
         super(RpcDispatcher, self).__init__()
 
-    def dispatch(self, ctxt, version, method, **kwargs):
+    def dispatch(self, ctxt, version, method, namespace, **kwargs):
         """Dispatch a message based on a requested version.
 
         :param ctxt: The request context
         :param version: The requested API version from the incoming message
         :param method: The method requested to be called by the incoming
                        message.
+        :param namespace: The namespace for the requested method.  If None,
+                          the dispatcher will look for a method on a callback
+                          object with no namespace set.
         :param kwargs: A dict of keyword arguments to be passed to the method.
 
         :returns: Whatever is returned by the underlying method that gets
@@ -120,13 +123,25 @@ class RpcDispatcher(object):
 
         had_compatible = False
         for proxyobj in self.callbacks:
-            if hasattr(proxyobj, 'RPC_API_VERSION'):
+            # Check for namespace compatibility
+            try:
+                cb_namespace = proxyobj.RPC_API_NAMESPACE
+            except AttributeError:
+                cb_namespace = None
+
+            if namespace != cb_namespace:
+                continue
+
+            # Check for version compatibility
+            try:
                 rpc_api_version = proxyobj.RPC_API_VERSION
-            else:
+            except AttributeError:
                 rpc_api_version = '1.0'
+
             is_compatible = rpc_common.version_is_compatible(rpc_api_version,
                                                              version)
             had_compatible = had_compatible or is_compatible
+
             if not hasattr(proxyobj, method):
                 continue
             if is_compatible:
index 0f2f53500189cef17e858a0cea84b670ab1d9a40..ec7200a7b68f8936eeccb28a71b6a09b692677fa 100644 (file)
@@ -57,13 +57,14 @@ class Consumer(object):
         self.topic = topic
         self.proxy = proxy
 
-    def call(self, context, version, method, args, timeout):
+    def call(self, context, version, method, namespace, args, timeout):
         done = eventlet.event.Event()
 
         def _inner():
             ctxt = RpcContext.from_dict(context.to_dict())
             try:
-                rval = self.proxy.dispatch(context, version, method, **args)
+                rval = self.proxy.dispatch(context, version, method,
+                                           namespace, **args)
                 res = []
                 # Caller might have called ctxt.reply() manually
                 for (reply, failure) in ctxt._response:
@@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None):
         return
     args = msg.get('args', {})
     version = msg.get('version', None)
+    namespace = msg.get('namespace', None)
 
     try:
         consumer = CONSUMERS[topic][0]
     except (KeyError, IndexError):
         return iter([None])
     else:
-        return consumer.call(context, version, method, args, timeout)
+        return consumer.call(context, version, method, namespace, args,
+                             timeout)
 
 
 def call(conf, context, topic, msg, timeout=None):
@@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg):
         return
     args = msg.get('args', {})
     version = msg.get('version', None)
+    namespace = msg.get('namespace', None)
 
     for consumer in CONSUMERS.get(topic, []):
         try:
-            consumer.call(context, version, method, args, None)
+            consumer.call(context, version, method, namespace, args, None)
         except Exception:
             pass
index 5f181cdd2c5cc3c8305fb8fc897c9b83267acc3a..24235b1f11fb2fc80f741489be4af1c321c422cf 100644 (file)
@@ -40,8 +40,8 @@ qpid_opts = [
     cfg.StrOpt('qpid_hostname',
                default='localhost',
                help='Qpid broker hostname'),
-    cfg.StrOpt('qpid_port',
-               default='5672',
+    cfg.IntOpt('qpid_port',
+               default=5672,
                help='Qpid broker port'),
     cfg.ListOpt('qpid_hosts',
                 default=['$qpid_hostname:$qpid_port'],
@@ -320,7 +320,7 @@ class Connection(object):
         # Reconnection is done by self.reconnect()
         self.connection.reconnect = False
         self.connection.heartbeat = self.conf.qpid_heartbeat
-        self.connection.protocol = self.conf.qpid_protocol
+        self.connection.transport = self.conf.qpid_protocol
         self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
 
     def _register_consumer(self, consumer):
index 6e3d93bada4ef70ca2dd6f8eabaf0901ed472f4a..d3d3599e8f450d3bee02cfc46618e1bb9698c04c 100644 (file)
@@ -221,7 +221,7 @@ class ZmqClient(object):
     def cast(self, msg_id, topic, data, envelope=False):
         msg_id = msg_id or 0
 
-        if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+        if not envelope:
             self.outq.send(map(bytes,
                            (msg_id, topic, 'cast', _serialize(data))))
             return
@@ -276,7 +276,8 @@ class InternalContext(object):
 
         try:
             result = proxy.dispatch(
-                ctx, data['version'], data['method'], **data['args'])
+                ctx, data['version'], data['method'],
+                data.get('namespace'), **data['args'])
             return ConsumerBase.normalize_reply(result, ctx.replies)
         except greenlet.GreenletExit:
             # ignore these since they are just from shutdowns
@@ -295,11 +296,16 @@ class InternalContext(object):
     def reply(self, ctx, proxy,
               msg_id=None, context=None, topic=None, msg=None):
         """Reply to a casted call."""
-        # Our real method is curried into msg['args']
+        # NOTE(ewindisch): context kwarg exists for Grizzly compat.
+        #                  this may be able to be removed earlier than
+        #                  'I' if ConsumerBase.process were refactored.
+        if type(msg) is list:
+            payload = msg[-1]
+        else:
+            payload = msg
 
-        child_ctx = RpcContext.unmarshal(msg[0])
         response = ConsumerBase.normalize_reply(
-            self._get_response(child_ctx, proxy, topic, msg[1]),
+            self._get_response(ctx, proxy, topic, payload),
             ctx.replies)
 
         LOG.debug(_("Sending reply"))
@@ -346,7 +352,7 @@ class ConsumerBase(object):
             return
 
         proxy.dispatch(ctx, data['version'],
-                       data['method'], **data['args'])
+                       data['method'], data.get('namespace'), **data['args'])
 
 
 class ZmqBaseReactor(ConsumerBase):
@@ -685,8 +691,8 @@ def _call(addr, context, topic, msg, timeout=None,
         'method': '-reply',
         'args': {
             'msg_id': msg_id,
-            'context': mcontext,
             'topic': reply_topic,
+            # TODO(ewindisch): safe to remove mcontext in I.
             'msg': [mcontext, msg]
         }
     }
index 23305d41d29af3e3d1629166497e0b9791250296..4ddc5c936a1fd07da17d2c8a314f25250d60c390 100644 (file)
@@ -58,9 +58,13 @@ class RpcProxy(object):
         """Return the topic to use for a message."""
         return topic if topic else self.topic
 
+    @staticmethod
+    def make_namespaced_msg(method, namespace, **kwargs):
+        return {'method': method, 'namespace': namespace, 'args': kwargs}
+
     @staticmethod
     def make_msg(method, **kwargs):
-        return {'method': method, 'args': kwargs}
+        return RpcProxy.make_namespaced_msg(method, None, **kwargs)
 
     def call(self, context, msg, topic=None, version=None, timeout=None):
         """rpc.call() a remote method.
@@ -68,16 +72,21 @@ class RpcProxy(object):
         :param context: The request context
         :param msg: The message to send, including the method and args.
         :param topic: Override the topic for this message.
+        :param version: (Optional) Override the requested API version in this
+               message.
         :param timeout: (Optional) A timeout to use when waiting for the
                response.  If no timeout is specified, a default timeout will be
                used that is usually sufficient.
-        :param version: (Optional) Override the requested API version in this
-               message.
 
         :returns: The return value from the remote method.
         """
         self._set_version(msg, version)
-        return rpc.call(context, self._get_topic(topic), msg, timeout)
+        real_topic = self._get_topic(topic)
+        try:
+            return rpc.call(context, real_topic, msg, timeout)
+        except rpc.common.Timeout as exc:
+            raise rpc.common.Timeout(
+                exc.info, real_topic, msg.get('method'))
 
     def multicall(self, context, msg, topic=None, version=None, timeout=None):
         """rpc.multicall() a remote method.
@@ -85,17 +94,22 @@ class RpcProxy(object):
         :param context: The request context
         :param msg: The message to send, including the method and args.
         :param topic: Override the topic for this message.
+        :param version: (Optional) Override the requested API version in this
+               message.
         :param timeout: (Optional) A timeout to use when waiting for the
                response.  If no timeout is specified, a default timeout will be
                used that is usually sufficient.
-        :param version: (Optional) Override the requested API version in this
-               message.
 
         :returns: An iterator that lets you process each of the returned values
                   from the remote method as they arrive.
         """
         self._set_version(msg, version)
-        return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+        real_topic = self._get_topic(topic)
+        try:
+            return rpc.multicall(context, real_topic, msg, timeout)
+        except rpc.common.Timeout as exc:
+            raise rpc.common.Timeout(
+                exc.info, real_topic, msg.get('method'))
 
     def cast(self, context, msg, topic=None, version=None):
         """rpc.cast() a remote method.
diff --git a/cinder/openstack/common/rpc/zmq_receiver.py b/cinder/openstack/common/rpc/zmq_receiver.py
new file mode 100755 (executable)
index 0000000..e4c6ee3
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+eventlet.monkey_patch()
+
+import contextlib
+import sys
+
+from oslo.config import cfg
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import rpc
+from cinder.openstack.common.rpc import impl_zmq
+
+CONF = cfg.CONF
+CONF.register_opts(rpc.rpc_opts)
+CONF.register_opts(impl_zmq.zmq_opts)
+
+
+def main():
+    CONF(sys.argv[1:], project='oslo')
+    logging.setup("oslo")
+
+    with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
+        reactor.consume_in_thread()
+        reactor.wait()
diff --git a/cinder/openstack/common/service.py b/cinder/openstack/common/service.py
new file mode 100644 (file)
index 0000000..8600a0b
--- /dev/null
@@ -0,0 +1,332 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Generic Node base class for all workers that run on hosts."""
+
+import errno
+import os
+import random
+import signal
+import sys
+import time
+
+import eventlet
+import logging as std_logging
+from oslo.config import cfg
+
+from cinder.openstack.common import eventlet_backdoor
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import importutils
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import threadgroup
+
+
+rpc = importutils.try_import('cinder.openstack.common.rpc')
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class Launcher(object):
+    """Launch one or more services and wait for them to complete."""
+
+    def __init__(self):
+        """Initialize the service launcher.
+
+        :returns: None
+
+        """
+        self._services = threadgroup.ThreadGroup()
+        eventlet_backdoor.initialize_if_enabled()
+
+    @staticmethod
+    def run_service(service):
+        """Start and wait for a service to finish.
+
+        :param service: service to run and wait for.
+        :returns: None
+
+        """
+        service.start()
+        service.wait()
+
+    def launch_service(self, service):
+        """Load and start the given service.
+
+        :param service: The service you would like to start.
+        :returns: None
+
+        """
+        self._services.add_thread(self.run_service, service)
+
+    def stop(self):
+        """Stop all services which are currently running.
+
+        :returns: None
+
+        """
+        self._services.stop()
+
+    def wait(self):
+        """Waits until all services have been stopped, and then returns.
+
+        :returns: None
+
+        """
+        self._services.wait()
+
+
+class SignalExit(SystemExit):
+    def __init__(self, signo, exccode=1):
+        super(SignalExit, self).__init__(exccode)
+        self.signo = signo
+
+
+class ServiceLauncher(Launcher):
+    def _handle_signal(self, signo, frame):
+        # Allow the process to be killed again and die from natural causes
+        signal.signal(signal.SIGTERM, signal.SIG_DFL)
+        signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+        raise SignalExit(signo)
+
+    def wait(self):
+        signal.signal(signal.SIGTERM, self._handle_signal)
+        signal.signal(signal.SIGINT, self._handle_signal)
+
+        LOG.debug(_('Full set of CONF:'))
+        CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+        status = None
+        try:
+            super(ServiceLauncher, self).wait()
+        except SignalExit as exc:
+            signame = {signal.SIGTERM: 'SIGTERM',
+                       signal.SIGINT: 'SIGINT'}[exc.signo]
+            LOG.info(_('Caught %s, exiting'), signame)
+            status = exc.code
+        except SystemExit as exc:
+            status = exc.code
+        finally:
+            if rpc:
+                rpc.cleanup()
+            self.stop()
+        return status
+
+
+class ServiceWrapper(object):
+    def __init__(self, service, workers):
+        self.service = service
+        self.workers = workers
+        self.children = set()
+        self.forktimes = []
+
+
+class ProcessLauncher(object):
+    def __init__(self):
+        self.children = {}
+        self.sigcaught = None
+        self.running = True
+        rfd, self.writepipe = os.pipe()
+        self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+        signal.signal(signal.SIGTERM, self._handle_signal)
+        signal.signal(signal.SIGINT, self._handle_signal)
+
+    def _handle_signal(self, signo, frame):
+        self.sigcaught = signo
+        self.running = False
+
+        # Allow the process to be killed again and die from natural causes
+        signal.signal(signal.SIGTERM, signal.SIG_DFL)
+        signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+    def _pipe_watcher(self):
+        # This will block until the write end is closed when the parent
+        # dies unexpectedly
+        self.readpipe.read()
+
+        LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+        sys.exit(1)
+
+    def _child_process(self, service):
+        # Setup child signal handlers differently
+        def _sigterm(*args):
+            signal.signal(signal.SIGTERM, signal.SIG_DFL)
+            raise SignalExit(signal.SIGTERM)
+
+        signal.signal(signal.SIGTERM, _sigterm)
+        # Block SIGINT and let the parent send us a SIGTERM
+        signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+        # Reopen the eventlet hub to make sure we don't share an epoll
+        # fd with parent and/or siblings, which would be bad
+        eventlet.hubs.use_hub()
+
+        # Close write to ensure only parent has it open
+        os.close(self.writepipe)
+        # Create greenthread to watch for parent to close pipe
+        eventlet.spawn_n(self._pipe_watcher)
+
+        # Reseed random number generator
+        random.seed()
+
+        launcher = Launcher()
+        launcher.run_service(service)
+
+    def _start_child(self, wrap):
+        if len(wrap.forktimes) > wrap.workers:
+            # Limit ourselves to one process a second (over the period of
+            # number of workers * 1 second). This will allow workers to
+            # start up quickly but ensure we don't fork off children that
+            # die instantly too quickly.
+            if time.time() - wrap.forktimes[0] < wrap.workers:
+                LOG.info(_('Forking too fast, sleeping'))
+                time.sleep(1)
+
+            wrap.forktimes.pop(0)
+
+        wrap.forktimes.append(time.time())
+
+        pid = os.fork()
+        if pid == 0:
+            # NOTE(johannes): All exceptions are caught to ensure this
+            # doesn't fallback into the loop spawning children. It would
+            # be bad for a child to spawn more children.
+            status = 0
+            try:
+                self._child_process(wrap.service)
+            except SignalExit as exc:
+                signame = {signal.SIGTERM: 'SIGTERM',
+                           signal.SIGINT: 'SIGINT'}[exc.signo]
+                LOG.info(_('Caught %s, exiting'), signame)
+                status = exc.code
+            except SystemExit as exc:
+                status = exc.code
+            except BaseException:
+                LOG.exception(_('Unhandled exception'))
+                status = 2
+            finally:
+                wrap.service.stop()
+
+            os._exit(status)
+
+        LOG.info(_('Started child %d'), pid)
+
+        wrap.children.add(pid)
+        self.children[pid] = wrap
+
+        return pid
+
+    def launch_service(self, service, workers=1):
+        wrap = ServiceWrapper(service, workers)
+
+        LOG.info(_('Starting %d workers'), wrap.workers)
+        while self.running and len(wrap.children) < wrap.workers:
+            self._start_child(wrap)
+
+    def _wait_child(self):
+        try:
+            # Don't block if no child processes have exited
+            pid, status = os.waitpid(0, os.WNOHANG)
+            if not pid:
+                return None
+        except OSError as exc:
+            if exc.errno not in (errno.EINTR, errno.ECHILD):
+                raise
+            return None
+
+        if os.WIFSIGNALED(status):
+            sig = os.WTERMSIG(status)
+            LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
+                     dict(pid=pid, sig=sig))
+        else:
+            code = os.WEXITSTATUS(status)
+            LOG.info(_('Child %(pid)s exited with status %(code)d'),
+                     dict(pid=pid, code=code))
+
+        if pid not in self.children:
+            LOG.warning(_('pid %d not in child list'), pid)
+            return None
+
+        wrap = self.children.pop(pid)
+        wrap.children.remove(pid)
+        return wrap
+
+    def wait(self):
+        """Loop waiting on children to die and respawning as necessary"""
+
+        LOG.debug(_('Full set of CONF:'))
+        CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+        while self.running:
+            wrap = self._wait_child()
+            if not wrap:
+                # Yield to other threads if no children have exited
+                # Sleep for a short time to avoid excessive CPU usage
+                # (see bug #1095346)
+                eventlet.greenthread.sleep(.01)
+                continue
+
+            while self.running and len(wrap.children) < wrap.workers:
+                self._start_child(wrap)
+
+        if self.sigcaught:
+            signame = {signal.SIGTERM: 'SIGTERM',
+                       signal.SIGINT: 'SIGINT'}[self.sigcaught]
+            LOG.info(_('Caught %s, stopping children'), signame)
+
+        for pid in self.children:
+            try:
+                os.kill(pid, signal.SIGTERM)
+            except OSError as exc:
+                if exc.errno != errno.ESRCH:
+                    raise
+
+        # Wait for children to die
+        if self.children:
+            LOG.info(_('Waiting on %d children to exit'), len(self.children))
+            while self.children:
+                self._wait_child()
+
+
+class Service(object):
+    """Service object for binaries running on hosts."""
+
+    def __init__(self, threads=1000):
+        self.tg = threadgroup.ThreadGroup(threads)
+
+    def start(self):
+        pass
+
+    def stop(self):
+        self.tg.stop()
+
+    def wait(self):
+        self.tg.wait()
+
+
+def launch(service, workers=None):
+    if workers:
+        launcher = ProcessLauncher()
+        launcher.launch_service(service, workers=workers)
+    else:
+        launcher = ServiceLauncher()
+        launcher.launch_service(service)
+    return launcher
diff --git a/cinder/openstack/common/threadgroup.py b/cinder/openstack/common/threadgroup.py
new file mode 100644 (file)
index 0000000..5d6ec00
--- /dev/null
@@ -0,0 +1,114 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from eventlet import greenlet
+from eventlet import greenpool
+from eventlet import greenthread
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import loopingcall
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_done(gt, *args, **kwargs):
+    """ Callback function to be passed to GreenThread.link() when we spawn()
+    Calls the :class:`ThreadGroup` to notify if.
+
+    """
+    kwargs['group'].thread_done(kwargs['thread'])
+
+
+class Thread(object):
+    """ Wrapper around a greenthread, that holds a reference to the
+    :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
+    it has done so it can be removed from the threads list.
+    """
+    def __init__(self, thread, group):
+        self.thread = thread
+        self.thread.link(_thread_done, group=group, thread=self)
+
+    def stop(self):
+        self.thread.kill()
+
+    def wait(self):
+        return self.thread.wait()
+
+
+class ThreadGroup(object):
+    """ The point of the ThreadGroup classis to:
+
+    * keep track of timers and greenthreads (making it easier to stop them
+      when need be).
+    * provide an easy API to add timers.
+    """
+    def __init__(self, thread_pool_size=10):
+        self.pool = greenpool.GreenPool(thread_pool_size)
+        self.threads = []
+        self.timers = []
+
+    def add_timer(self, interval, callback, initial_delay=None,
+                  *args, **kwargs):
+        pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
+        pulse.start(interval=interval,
+                    initial_delay=initial_delay)
+        self.timers.append(pulse)
+
+    def add_thread(self, callback, *args, **kwargs):
+        gt = self.pool.spawn(callback, *args, **kwargs)
+        th = Thread(gt, self)
+        self.threads.append(th)
+
+    def thread_done(self, thread):
+        self.threads.remove(thread)
+
+    def stop(self):
+        current = greenthread.getcurrent()
+        for x in self.threads:
+            if x is current:
+                # don't kill the current thread.
+                continue
+            try:
+                x.stop()
+            except Exception as ex:
+                LOG.exception(ex)
+
+        for x in self.timers:
+            try:
+                x.stop()
+            except Exception as ex:
+                LOG.exception(ex)
+        self.timers = []
+
+    def wait(self):
+        for x in self.timers:
+            try:
+                x.wait()
+            except greenlet.GreenletExit:
+                pass
+            except Exception as ex:
+                LOG.exception(ex)
+        current = greenthread.getcurrent()
+        for x in self.threads:
+            if x is current:
+                continue
+            try:
+                x.wait()
+            except greenlet.GreenletExit:
+                pass
+            except Exception as ex:
+                LOG.exception(ex)