]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Update Oslo to 96d1f887dda Part 1
authorAlexander Gordeev <agordeev@mirantis.com>
Thu, 1 Aug 2013 13:22:20 +0000 (17:22 +0400)
committerAlexander Gordeev <agordeev@mirantis.com>
Tue, 13 Aug 2013 09:38:45 +0000 (13:38 +0400)
Oslo version 96d1f887dda21b43ba4376187f31953dee6f5273

This commit just injects fresh portion of code from Oslo
into Heat and fixes all the issues with new code

Partially implements blueprint oslo-db-support

Change-Id: I7e98c12ddf6689efc6ea6a4deab0b1c840297730

23 files changed:
etc/heat/heat.conf.sample
heat/openstack/common/config/generator.py [changed mode: 0755->0644]
heat/openstack/common/context.py
heat/openstack/common/eventlet_backdoor.py
heat/openstack/common/loopingcall.py
heat/openstack/common/network_utils.py
heat/openstack/common/notifier/api.py
heat/openstack/common/rpc/__init__.py
heat/openstack/common/rpc/amqp.py
heat/openstack/common/rpc/common.py
heat/openstack/common/rpc/impl_kombu.py
heat/openstack/common/rpc/impl_qpid.py
heat/openstack/common/rpc/impl_zmq.py
heat/openstack/common/rpc/matchmaker.py
heat/openstack/common/rpc/matchmaker_ring.py
heat/openstack/common/rpc/proxy.py
heat/openstack/common/rpc/service.py
heat/openstack/common/service.py
heat/openstack/common/sslutils.py [new file with mode: 0644]
heat/openstack/common/threadgroup.py
heat/openstack/common/timeutils.py
tools/config/generate_sample.sh [new file with mode: 0755]
tools/patch_tox_venv.py

index 95eaf4902f65be4a9173313472d8cf5f5ead17cb..fd4baf29c6d03fe2ef9faf435478c6d06952a191 100644 (file)
 # Options defined in heat.openstack.common.eventlet_backdoor
 #
 
-# port for eventlet backdoor to listen (integer value)
+# Enable eventlet backdoor.  Acceptable values are 0, <port>,
+# and <start>:<end>, where 0 results in listening on a random
+# tcp port number; <port> results in listening on the
+# specified port number (and not enabling backdoor if that
+# port is in use); and <start>:<end> results in listening on
+# the smallest unused port number within the specified range
+# of port numbers.  The chosen port is displayed in the
+# service's log file. (string value)
 #backdoor_port=<None>
 
 
 #control_exchange=openstack
 
 
+#
+# Options defined in heat.openstack.common.rpc.amqp
+#
+
+# Use durable queues in amqp. (boolean value)
+#amqp_durable_queues=false
+
+# Auto-delete queues in amqp. (boolean value)
+#amqp_auto_delete=false
+
+
 #
 # Options defined in heat.openstack.common.rpc.impl_kombu
 #
 
-# SSL version to use (valid only if SSL enabled) (string
-# value)
+# SSL version to use (valid only if SSL enabled). valid values
+# are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
+# distributions (string value)
 #kombu_ssl_version=
 
 # SSL key file (valid only if SSL enabled) (string value)
 # value)
 #rabbit_max_retries=0
 
-# use durable queues in RabbitMQ (boolean value)
-#rabbit_durable_queues=false
-
 # use H/A queues in RabbitMQ (x-ha-policy: all).You need to
 # wipe RabbitMQ database when changing this option. (boolean
 # value)
 #matchmaker_heartbeat_ttl=600
 
 
+[ssl]
+
+#
+# Options defined in heat.openstack.common.sslutils
+#
+
+# CA certificate file to use to verify connecting clients
+# (string value)
+#ca_file=<None>
+
+# Certificate file to use when starting the server securely
+# (string value)
+#cert_file=<None>
+
+# Private key file to use when starting the server securely
+# (string value)
+#key_file=<None>
+
+
 [paste_deploy]
 
 #
 #ringfile=/etc/oslo/matchmaker_ring.json
 
 
-# Total option count: 109
old mode 100755 (executable)
new mode 100644 (file)
index 30c08a8..27e217e
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 
 # Copyright 2012 SINA Corporation
@@ -16,8 +15,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 #
-# @author: Zhongyue Luo, SINA Corporation.
-#
 
 """Extracts OpenStack config option info from module(s)."""
 
@@ -53,7 +50,6 @@ OPT_TYPES = {
     MULTISTROPT: 'multi valued',
 }
 
-OPTION_COUNT = 0
 OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT,
                                               FLOATOPT, LISTOPT,
                                               MULTISTROPT]))
@@ -100,8 +96,6 @@ def generate(srcfiles):
     for group, opts in opts_by_group.items():
         print_group_opts(group, opts)
 
-    print("# Total option count: %d" % OPTION_COUNT)
-
 
 def _import_module(mod_str):
     try:
@@ -166,9 +160,7 @@ def _list_opts(obj):
 def print_group_opts(group, opts_by_module):
     print("[%s]" % group)
     print('')
-    global OPTION_COUNT
     for mod, opts in opts_by_module:
-        OPTION_COUNT += len(opts)
         print('#')
         print('# Options defined in %s' % mod)
         print('#')
@@ -189,24 +181,24 @@ def _get_my_ip():
         return None
 
 
-def _sanitize_default(s):
+def _sanitize_default(name, value):
     """Set up a reasonably sensible default for pybasedir, my_ip and host."""
-    if s.startswith(sys.prefix):
+    if value.startswith(sys.prefix):
         # NOTE(jd) Don't use os.path.join, because it is likely to think the
         # second part is an absolute pathname and therefore drop the first
         # part.
-        s = os.path.normpath("/usr/" + s[len(sys.prefix):])
-    elif s.startswith(BASEDIR):
-        return s.replace(BASEDIR, '/usr/lib/python/site-packages')
-    elif BASEDIR in s:
-        return s.replace(BASEDIR, '')
-    elif s == _get_my_ip():
+        value = os.path.normpath("/usr/" + value[len(sys.prefix):])
+    elif value.startswith(BASEDIR):
+        return value.replace(BASEDIR, '/usr/lib/python/site-packages')
+    elif BASEDIR in value:
+        return value.replace(BASEDIR, '')
+    elif value == _get_my_ip():
         return '10.0.0.1'
-    elif s == socket.gethostname():
+    elif value == socket.gethostname() and 'host' in name:
         return 'heat'
-    elif s.strip() != s:
-        return '"%s"' % s
-    return s
+    elif value.strip() != value:
+        return '"%s"' % value
+    return value
 
 
 def _print_opt(opt):
@@ -227,7 +219,8 @@ def _print_opt(opt):
             print('#%s=<None>' % opt_name)
         elif opt_type == STROPT:
             assert(isinstance(opt_default, basestring))
-            print('#%s=%s' % (opt_name, _sanitize_default(opt_default)))
+            print('#%s=%s' % (opt_name, _sanitize_default(opt_name,
+                                                          opt_default)))
         elif opt_type == BOOLOPT:
             assert(isinstance(opt_default, bool))
             print('#%s=%s' % (opt_name, str(opt_default).lower()))
index b125af71fd4d8cdcf17bfe5889c3d26ad5e3f5ad..1d9d7eb7c5abce5d51c13e79b8d5ce304c9fae53 100644 (file)
@@ -61,7 +61,7 @@ class RequestContext(object):
                 'request_id': self.request_id}
 
 
-def get_admin_context(show_deleted="no"):
+def get_admin_context(show_deleted=False):
     context = RequestContext(None,
                              tenant=None,
                              is_admin=True,
index 57b89ae914ebdfcd4ad041135d749319b5f4de77..7612d55bd77cb1be2e75716c9eac23a2fbefb36b 100644 (file)
 
 from __future__ import print_function
 
+import errno
 import gc
+import os
 import pprint
+import socket
 import sys
 import traceback
 
@@ -28,14 +31,34 @@ import eventlet.backdoor
 import greenlet
 from oslo.config import cfg
 
+from heat.openstack.common.gettextutils import _  # noqa
+from heat.openstack.common import log as logging
+
+help_for_backdoor_port = (
+    "Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
+    "in listening on a random tcp port number; <port> results in listening "
+    "on the specified port number (and not enabling backdoor if that port "
+    "is in use); and <start>:<end> results in listening on the smallest "
+    "unused port number within the specified range of port numbers.  The "
+    "chosen port is displayed in the service's log file.")
 eventlet_backdoor_opts = [
-    cfg.IntOpt('backdoor_port',
+    cfg.StrOpt('backdoor_port',
                default=None,
-               help='port for eventlet backdoor to listen')
+               help="Enable eventlet backdoor.  %s" % help_for_backdoor_port)
 ]
 
 CONF = cfg.CONF
 CONF.register_opts(eventlet_backdoor_opts)
+LOG = logging.getLogger(__name__)
+
+
+class EventletBackdoorConfigValueError(Exception):
+    def __init__(self, port_range, help_msg, ex):
+        msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
+               '%(help)s' %
+               {'range': port_range, 'ex': ex, 'help': help_msg})
+        super(EventletBackdoorConfigValueError, self).__init__(msg)
+        self.port_range = port_range
 
 
 def _dont_use_this():
@@ -60,6 +83,33 @@ def _print_nativethreads():
         print()
 
 
+def _parse_port_range(port_range):
+    if ':' not in port_range:
+        start, end = port_range, port_range
+    else:
+        start, end = port_range.split(':', 1)
+    try:
+        start, end = int(start), int(end)
+        if end < start:
+            raise ValueError
+        return start, end
+    except ValueError as ex:
+        raise EventletBackdoorConfigValueError(port_range, ex,
+                                               help_for_backdoor_port)
+
+
+def _listen(host, start_port, end_port, listen_func):
+    try_port = start_port
+    while True:
+        try:
+            return listen_func((host, try_port))
+        except socket.error as exc:
+            if (exc.errno != errno.EADDRINUSE or
+               try_port >= end_port):
+                raise
+            try_port += 1
+
+
 def initialize_if_enabled():
     backdoor_locals = {
         'exit': _dont_use_this,      # So we don't exit the entire process
@@ -72,6 +122,8 @@ def initialize_if_enabled():
     if CONF.backdoor_port is None:
         return None
 
+    start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
+
     # NOTE(johannes): The standard sys.displayhook will print the value of
     # the last expression and set it to __builtin__._, which overwrites
     # the __builtin__._ that gettext sets. Let's switch to using pprint
@@ -82,8 +134,13 @@ def initialize_if_enabled():
             pprint.pprint(val)
     sys.displayhook = displayhook
 
-    sock = eventlet.listen(('localhost', CONF.backdoor_port))
+    sock = _listen('localhost', start_port, end_port, eventlet.listen)
+
+    # In the case of backdoor port being zero, a port number is assigned by
+    # listen().  In any case, pull the port number out here.
     port = sock.getsockname()[1]
+    LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
+             {'port': port, 'pid': os.getpid()})
     eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
                      locals=backdoor_locals)
     return port
index 0fa60bd9e41fa66fbb2f2b83d51ee95dabd7ca8c..1cd248423bcace038b4c9ea7e461f8277c68f451 100644 (file)
@@ -22,7 +22,7 @@ import sys
 from eventlet import event
 from eventlet import greenthread
 
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import log as logging
 from heat.openstack.common import timeutils
 
index d049d658ce89193646ab5b7f5b6380294f2cb7c1..dbed1ceb44faee5d6f756c2c359d8c2fdb615561 100644 (file)
 Network-related utilities and helper functions.
 """
 
-from heat.openstack.common import log as logging
-
-
-LOG = logging.getLogger(__name__)
+import urlparse
 
 
 def parse_host_port(address, default_port=None):
@@ -67,3 +64,18 @@ def parse_host_port(address, default_port=None):
             port = default_port
 
     return (host, None if port is None else int(port))
+
+
+def urlsplit(url, scheme='', allow_fragments=True):
+    """Parse a URL using urlparse.urlsplit(), splitting query and fragments.
+    This function papers over Python issue9374 when needed.
+
+    The parameters are the same as urlparse.urlsplit.
+    """
+    scheme, netloc, path, query, fragment = urlparse.urlsplit(
+        url, scheme, allow_fragments)
+    if allow_fragments and '#' in path:
+        path, fragment = path.split('#', 1)
+    if '?' in path:
+        path, query = path.split('?', 1)
+    return urlparse.SplitResult(scheme, netloc, path, query, fragment)
index 428e5068c1d333d7075f700ecefea0990d41a395..cd1e7b4bf98b2b4e4e64ffcfed8398969b0208d0 100644 (file)
@@ -157,29 +157,16 @@ def _get_drivers():
     if _drivers is None:
         _drivers = {}
         for notification_driver in CONF.notification_driver:
-            add_driver(notification_driver)
-
+            try:
+                driver = importutils.import_module(notification_driver)
+                _drivers[notification_driver] = driver
+            except ImportError:
+                LOG.exception(_("Failed to load notifier %s. "
+                                "These notifications will not be sent.") %
+                              notification_driver)
     return _drivers.values()
 
 
-def add_driver(notification_driver):
-    """Add a notification driver at runtime."""
-    # Make sure the driver list is initialized.
-    _get_drivers()
-    if isinstance(notification_driver, basestring):
-        # Load and add
-        try:
-            driver = importutils.import_module(notification_driver)
-            _drivers[notification_driver] = driver
-        except ImportError:
-            LOG.exception(_("Failed to load notifier %s. "
-                            "These notifications will not be sent.") %
-                          notification_driver)
-    else:
-        # Driver is already loaded; just add the object.
-        _drivers[notification_driver] = notification_driver
-
-
 def _reset_drivers():
     """Used by unit tests to reset the drivers."""
     global _drivers
index 6907f726a59bf35f10d2ae88a625b957d1101d96..ab62f93d2885ddf85816a9f7d82386bb04d6f5fa 100644 (file)
@@ -29,7 +29,7 @@ import inspect
 
 from oslo.config import cfg
 
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import importutils
 from heat.openstack.common import local
 from heat.openstack.common import log as logging
index ac73877482e1912faabf068a52231752cc610bb6..36501c4201aec05bd31f19218e02d39e33602e29 100644 (file)
@@ -34,14 +34,28 @@ from eventlet import greenpool
 from eventlet import pools
 from eventlet import queue
 from eventlet import semaphore
+from oslo.config import cfg
 
 from heat.openstack.common import excutils
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import local
 from heat.openstack.common import log as logging
 from heat.openstack.common.rpc import common as rpc_common
 
 
+amqp_opts = [
+    cfg.BoolOpt('amqp_durable_queues',
+                default=False,
+                deprecated_name='rabbit_durable_queues',
+                deprecated_group='DEFAULT',
+                help='Use durable queues in amqp.'),
+    cfg.BoolOpt('amqp_auto_delete',
+                default=False,
+                help='Auto-delete queues in amqp.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
+
 UNIQUE_ID = '_unique_id'
 LOG = logging.getLogger(__name__)
 
@@ -151,11 +165,13 @@ class ConnectionContext(rpc_common.Connection):
     def create_worker(self, topic, proxy, pool_name):
         self.connection.create_worker(topic, proxy, pool_name)
 
-    def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+    def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
+                           ack_on_error=True):
         self.connection.join_consumer_pool(callback,
                                            pool_name,
                                            topic,
-                                           exchange_name)
+                                           exchange_name,
+                                           ack_on_error)
 
     def consume_in_thread(self):
         self.connection.consume_in_thread()
@@ -219,12 +235,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
             failure = rpc_common.serialize_remote_exception(failure,
                                                             log_failure)
 
-        try:
-            msg = {'result': reply, 'failure': failure}
-        except TypeError:
-            msg = {'result': dict((k, repr(v))
-                   for k, v in reply.__dict__.iteritems()),
-                   'failure': failure}
+        msg = {'result': reply, 'failure': failure}
         if ending:
             msg['ending'] = True
         _add_unique_id(msg)
index 3651611247cb1d37fe81d791a804d4f0ea3c092b..bc19c023cb948d516af7eef8ddd56eaff4354db1 100644 (file)
@@ -24,6 +24,7 @@ import traceback
 from oslo.config import cfg
 import six
 
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import importutils
 from heat.openstack.common import jsonutils
 from heat.openstack.common import local
@@ -73,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote'
 
 
 class RPCException(Exception):
-    message = _("An unknown RPC related exception occurred.")
+    msg_fmt = _("An unknown RPC related exception occurred.")
 
     def __init__(self, message=None, **kwargs):
         self.kwargs = kwargs
 
         if not message:
             try:
-                message = self.message % kwargs
+                message = self.msg_fmt % kwargs
 
             except Exception:
                 # kwargs doesn't match a variable in the message
@@ -89,7 +90,7 @@ class RPCException(Exception):
                 for name, value in kwargs.iteritems():
                     LOG.error("%s: %s" % (name, value))
                 # at least get the core message out if something happened
-                message = self.message
+                message = self.msg_fmt
 
         super(RPCException, self).__init__(message)
 
@@ -103,7 +104,7 @@ class RemoteError(RPCException):
     contains all of the relevant info.
 
     """
-    message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+    msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
 
     def __init__(self, exc_type=None, value=None, traceback=None):
         self.exc_type = exc_type
@@ -120,7 +121,7 @@ class Timeout(RPCException):
     This exception is raised if the rpc_response_timeout is reached while
     waiting for a response from the remote side.
     """
-    message = _('Timeout while waiting on RPC response - '
+    msg_fmt = _('Timeout while waiting on RPC response - '
                 'topic: "%(topic)s", RPC method: "%(method)s" '
                 'info: "%(info)s"')
 
@@ -143,25 +144,25 @@ class Timeout(RPCException):
 
 
 class DuplicateMessageError(RPCException):
-    message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+    msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
 
 
 class InvalidRPCConnectionReuse(RPCException):
-    message = _("Invalid reuse of an RPC connection.")
+    msg_fmt = _("Invalid reuse of an RPC connection.")
 
 
 class UnsupportedRpcVersion(RPCException):
-    message = _("Specified RPC version, %(version)s, not supported by "
+    msg_fmt = _("Specified RPC version, %(version)s, not supported by "
                 "this endpoint.")
 
 
 class UnsupportedRpcEnvelopeVersion(RPCException):
-    message = _("Specified RPC envelope version, %(version)s, "
+    msg_fmt = _("Specified RPC envelope version, %(version)s, "
                 "not supported by this endpoint.")
 
 
 class RpcVersionCapError(RPCException):
-    message = _("Specified RPC version cap, %(version_cap)s, is too low")
+    msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
 
 
 class Connection(object):
@@ -260,41 +261,20 @@ class Connection(object):
 
 def _safe_log(log_func, msg, msg_data):
     """Sanitizes the msg_data field before logging."""
-    SANITIZE = {'set_admin_password': [('args', 'new_pass')],
-                'run_instance': [('args', 'admin_password')],
-                'route_message': [('args', 'message', 'args', 'method_info',
-                                   'method_kwargs', 'password'),
-                                  ('args', 'message', 'args', 'method_info',
-                                   'method_kwargs', 'admin_password')]}
-
-    has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
-    has_context_token = '_context_auth_token' in msg_data
-    has_token = 'auth_token' in msg_data
-
-    if not any([has_method, has_context_token, has_token]):
-        return log_func(msg, msg_data)
-
-    msg_data = copy.deepcopy(msg_data)
-
-    if has_method:
-        for arg in SANITIZE.get(msg_data['method'], []):
-            try:
-                d = msg_data
-                for elem in arg[:-1]:
-                    d = d[elem]
-                d[arg[-1]] = '<SANITIZED>'
-            except KeyError as e:
-                LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
-                         {'item': arg,
-                          'err': e})
-
-    if has_context_token:
-        msg_data['_context_auth_token'] = '<SANITIZED>'
-
-    if has_token:
-        msg_data['auth_token'] = '<SANITIZED>'
-
-    return log_func(msg, msg_data)
+    SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
+
+    def _fix_passwords(d):
+        """Sanitizes the password fields in the dictionary."""
+        for k in d.iterkeys():
+            if k.lower().find('password') != -1:
+                d[k] = '<SANITIZED>'
+            elif k.lower() in SANITIZE:
+                d[k] = '<SANITIZED>'
+            elif isinstance(d[k], dict):
+                _fix_passwords(d[k])
+        return d
+
+    return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
 
 
 def serialize_remote_exception(failure_info, log_failure=True):
index c29007dbd12f989f6dda34ce1255c72b4018b1c2..b4ab92bacf8e9c68322075094847720b73ab0ef6 100644 (file)
@@ -18,7 +18,6 @@ import functools
 import itertools
 import socket
 import ssl
-import sys
 import time
 import uuid
 
@@ -30,15 +29,20 @@ import kombu.entity
 import kombu.messaging
 from oslo.config import cfg
 
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common import excutils
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import network_utils
 from heat.openstack.common.rpc import amqp as rpc_amqp
 from heat.openstack.common.rpc import common as rpc_common
+from heat.openstack.common import sslutils
 
 kombu_opts = [
     cfg.StrOpt('kombu_ssl_version',
                default='',
-               help='SSL version to use (valid only if SSL enabled)'),
+               help='SSL version to use (valid only if SSL enabled). '
+                    'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
+                    'be available on some distributions'
+               ),
     cfg.StrOpt('kombu_ssl_keyfile',
                default='',
                help='SSL key file (valid only if SSL enabled)'),
@@ -82,9 +86,6 @@ kombu_opts = [
                default=0,
                help='maximum retries with trying to connect to RabbitMQ '
                     '(the default of 0 implies an infinite retry count)'),
-    cfg.BoolOpt('rabbit_durable_queues',
-                default=False,
-                help='use durable queues in RabbitMQ'),
     cfg.BoolOpt('rabbit_ha_queues',
                 default=False,
                 help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@@ -129,6 +130,7 @@ class ConsumerBase(object):
         self.tag = str(tag)
         self.kwargs = kwargs
         self.queue = None
+        self.ack_on_error = kwargs.get('ack_on_error', True)
         self.reconnect(channel)
 
     def reconnect(self, channel):
@@ -138,6 +140,36 @@ class ConsumerBase(object):
         self.queue = kombu.entity.Queue(**self.kwargs)
         self.queue.declare()
 
+    def _callback_handler(self, message, callback):
+        """Call callback with deserialized message.
+
+        Messages that are processed without exception are ack'ed.
+
+        If the message processing generates an exception, it will be
+        ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
+        Rejection is better than waiting for the message to timeout.
+        Rejected messages are immediately requeued.
+        """
+
+        ack_msg = False
+        try:
+            msg = rpc_common.deserialize_msg(message.payload)
+            callback(msg)
+            ack_msg = True
+        except Exception:
+            if self.ack_on_error:
+                ack_msg = True
+                LOG.exception(_("Failed to process message"
+                                " ... skipping it."))
+            else:
+                LOG.exception(_("Failed to process message"
+                                " ... will requeue."))
+        finally:
+            if ack_msg:
+                message.ack()
+            else:
+                message.reject()
+
     def consume(self, *args, **kwargs):
         """Actually declare the consumer on the amqp channel.  This will
         start the flow of messages from the queue.  Using the
@@ -150,8 +182,6 @@ class ConsumerBase(object):
         If kwargs['nowait'] is True, then this call will block until
         a message is read.
 
-        Messages will automatically be acked if the callback doesn't
-        raise an exception
         """
 
         options = {'consumer_tag': self.tag}
@@ -162,13 +192,7 @@ class ConsumerBase(object):
 
         def _callback(raw_message):
             message = self.channel.message_to_python(raw_message)
-            try:
-                msg = rpc_common.deserialize_msg(message.payload)
-                callback(msg)
-            except Exception:
-                LOG.exception(_("Failed to process message... skipping it."))
-            finally:
-                message.ack()
+            self._callback_handler(message, callback)
 
         self.queue.consume(*args, callback=_callback, **options)
 
@@ -233,9 +257,9 @@ class TopicConsumer(ConsumerBase):
         Other kombu options may be passed as keyword arguments
         """
         # Default options
-        options = {'durable': conf.rabbit_durable_queues,
+        options = {'durable': conf.amqp_durable_queues,
                    'queue_arguments': _get_queue_arguments(conf),
-                   'auto_delete': False,
+                   'auto_delete': conf.amqp_auto_delete,
                    'exclusive': False}
         options.update(kwargs)
         exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@@ -339,8 +363,8 @@ class TopicPublisher(Publisher):
 
         Kombu options may be passed as keyword args to override defaults
         """
-        options = {'durable': conf.rabbit_durable_queues,
-                   'auto_delete': False,
+        options = {'durable': conf.amqp_durable_queues,
+                   'auto_delete': conf.amqp_auto_delete,
                    'exclusive': False}
         options.update(kwargs)
         exchange_name = rpc_amqp.get_control_exchange(conf)
@@ -370,7 +394,7 @@ class NotifyPublisher(TopicPublisher):
     """Publisher class for 'notify'."""
 
     def __init__(self, conf, channel, topic, **kwargs):
-        self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+        self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
         self.queue_arguments = _get_queue_arguments(conf)
         super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
 
@@ -454,7 +478,8 @@ class Connection(object):
 
         # http://docs.python.org/library/ssl.html - ssl.wrap_socket
         if self.conf.kombu_ssl_version:
-            ssl_params['ssl_version'] = self.conf.kombu_ssl_version
+            ssl_params['ssl_version'] = sslutils.validate_ssl_version(
+                self.conf.kombu_ssl_version)
         if self.conf.kombu_ssl_keyfile:
             ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
         if self.conf.kombu_ssl_certfile:
@@ -465,12 +490,8 @@ class Connection(object):
             # future with this?
             ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
 
-        if not ssl_params:
-            # Just have the default behavior
-            return True
-        else:
-            # Return the extended behavior
-            return ssl_params
+        # Return the extended behavior or just have the default behavior
+        return ssl_params or True
 
     def _connect(self, params):
         """Connect to rabbit.  Re-establish any queues that may have
@@ -537,13 +558,11 @@ class Connection(object):
             log_info.update(params)
 
             if self.max_retries and attempt == self.max_retries:
-                LOG.error(_('Unable to connect to AMQP server on '
-                            '%(hostname)s:%(port)d after %(max_retries)d '
-                            'tries: %(err_str)s') % log_info)
-                # NOTE(comstud): Copied from original code.  There's
-                # really no better recourse because if this was a queue we
-                # need to consume on, we have no way to consume anymore.
-                sys.exit(1)
+                msg = _('Unable to connect to AMQP server on '
+                        '%(hostname)s:%(port)d after %(max_retries)d '
+                        'tries: %(err_str)s') % log_info
+                LOG.error(msg)
+                raise rpc_common.RPCException(msg)
 
             if attempt == 1:
                 sleep_time = self.interval_start or 1
@@ -635,8 +654,8 @@ class Connection(object):
 
         def _consume():
             if info['do_consume']:
-                queues_head = self.consumers[:-1]
-                queues_tail = self.consumers[-1]
+                queues_head = self.consumers[:-1]  # not fanout.
+                queues_tail = self.consumers[-1]  # fanout
                 for queue in queues_head:
                     queue.consume(nowait=True)
                 queues_tail.consume(nowait=False)
@@ -685,11 +704,12 @@ class Connection(object):
         self.declare_consumer(DirectConsumer, topic, callback)
 
     def declare_topic_consumer(self, topic, callback=None, queue_name=None,
-                               exchange_name=None):
+                               exchange_name=None, ack_on_error=True):
         """Create a 'topic' consumer."""
         self.declare_consumer(functools.partial(TopicConsumer,
                                                 name=queue_name,
                                                 exchange_name=exchange_name,
+                                                ack_on_error=ack_on_error,
                                                 ),
                               topic, callback)
 
@@ -724,6 +744,7 @@ class Connection(object):
 
     def consume_in_thread(self):
         """Consumer from all queues/consumers in a greenthread."""
+        @excutils.forever_retry_uncaught_exceptions
         def _consumer_thread():
             try:
                 self.consume()
@@ -754,7 +775,7 @@ class Connection(object):
         self.declare_topic_consumer(topic, proxy_cb, pool_name)
 
     def join_consumer_pool(self, callback, pool_name, topic,
-                           exchange_name=None):
+                           exchange_name=None, ack_on_error=True):
         """Register as a member of a group of consumers for a given topic from
         the specified exchange.
 
@@ -775,6 +796,7 @@ class Connection(object):
             topic=topic,
             exchange_name=exchange_name,
             callback=callback_wrapper,
+            ack_on_error=ack_on_error,
         )
 
 
index b98d84bd2d2afc3f360470a1d36fa696ecb83873..c3fc593b9b60135bdec1fcbe901413ff42578ead 100644 (file)
@@ -24,7 +24,8 @@ import eventlet
 import greenlet
 from oslo.config import cfg
 
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common import excutils
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import importutils
 from heat.openstack.common import jsonutils
 from heat.openstack.common import log as logging
@@ -118,10 +119,17 @@ class ConsumerBase(object):
 
         self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
-        self.reconnect(session)
+        self.connect(session)
+
+    def connect(self, session):
+        """Declare the reciever on connect."""
+        self._declare_receiver(session)
 
     def reconnect(self, session):
         """Re-declare the receiver after a qpid reconnect."""
+        self._declare_receiver(session)
+
+    def _declare_receiver(self, session):
         self.session = session
         self.receiver = session.receiver(self.address)
         self.receiver.capacity = 1
@@ -152,11 +160,15 @@ class ConsumerBase(object):
         except Exception:
             LOG.exception(_("Failed to process message... skipping it."))
         finally:
+            # TODO(sandy): Need support for optional ack_on_error.
             self.session.acknowledge(message)
 
     def get_receiver(self):
         return self.receiver
 
+    def get_node_name(self):
+        return self.address.split(';')[0]
+
 
 class DirectConsumer(ConsumerBase):
     """Queue/consumer class for 'direct'."""
@@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase):
         'callback' is the callback to call when messages are received
         """
 
-        super(DirectConsumer, self).__init__(session, callback,
-                                             "%s/%s" % (msg_id, msg_id),
-                                             {"type": "direct"},
-                                             msg_id,
-                                             {"exclusive": True})
+        super(DirectConsumer, self).__init__(
+            session, callback,
+            "%s/%s" % (msg_id, msg_id),
+            {"type": "direct"},
+            msg_id,
+            {
+                "auto-delete": conf.amqp_auto_delete,
+                "exclusive": True,
+                "durable": conf.amqp_durable_queues,
+            })
 
 
 class TopicConsumer(ConsumerBase):
@@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase):
         """
 
         exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
-        super(TopicConsumer, self).__init__(session, callback,
-                                            "%s/%s" % (exchange_name, topic),
-                                            {}, name or topic, {})
+        super(TopicConsumer, self).__init__(
+            session, callback,
+            "%s/%s" % (exchange_name, topic),
+            {}, name or topic,
+            {
+                "auto-delete": conf.amqp_auto_delete,
+                "durable": conf.amqp_durable_queues,
+            })
 
 
 class FanoutConsumer(ConsumerBase):
@@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase):
         'topic' is the topic to listen on
         'callback' is the callback to call when messages are received
         """
+        self.conf = conf
 
         super(FanoutConsumer, self).__init__(
             session, callback,
@@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase):
             "%s_fanout_%s" % (topic, uuid.uuid4().hex),
             {"exclusive": True})
 
+    def reconnect(self, session):
+        topic = self.get_node_name().rpartition('_fanout')[0]
+        params = {
+            'session': session,
+            'topic': topic,
+            'callback': self.callback,
+        }
+
+        self.__init__(conf=self.conf, **params)
+
+        super(FanoutConsumer, self).reconnect(session)
+
 
 class Publisher(object):
     """Base Publisher class."""
@@ -575,6 +610,7 @@ class Connection(object):
 
     def consume_in_thread(self):
         """Consumer from all queues/consumers in a greenthread."""
+        @excutils.forever_retry_uncaught_exceptions
         def _consumer_thread():
             try:
                 self.consume()
@@ -615,7 +651,7 @@ class Connection(object):
         return consumer
 
     def join_consumer_pool(self, callback, pool_name, topic,
-                           exchange_name=None):
+                           exchange_name=None, ack_on_error=True):
         """Register as a member of a group of consumers for a given topic from
         the specified exchange.
 
index 1a2f4b3ed8bea71b62d033946f480b34d24a661b..44c1267d67bbfb0f634b1f2566b81c8b22f6624a 100644 (file)
@@ -27,7 +27,7 @@ import greenlet
 from oslo.config import cfg
 
 from heat.openstack.common import excutils
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import importutils
 from heat.openstack.common import jsonutils
 from heat.openstack.common.rpc import common as rpc_common
@@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase):
     def __init__(self, conf):
         super(ZmqBaseReactor, self).__init__()
 
-        self.mapping = {}
         self.proxies = {}
         self.threads = []
         self.sockets = []
@@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase):
 
         self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
 
-    def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
-                 zmq_type_out=None, in_bind=True, out_bind=True,
-                 subscribe=None):
+    def register(self, proxy, in_addr, zmq_type_in,
+                 in_bind=True, subscribe=None):
 
         LOG.info(_("Registering reactor"))
 
@@ -384,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase):
 
         LOG.info(_("In reactor registered"))
 
-        if not out_addr:
-            return
-
-        if zmq_type_out not in (zmq.PUSH, zmq.PUB):
-            raise RPCException("Bad output socktype")
-
-        # Items push out.
-        outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
-
-        self.mapping[inq] = outq
-        self.mapping[outq] = inq
-        self.sockets.append(outq)
-
-        LOG.info(_("Out reactor registered"))
-
     def consume_in_thread(self):
         def _consume(sock):
             LOG.info(_("Consuming socket"))
@@ -516,8 +499,7 @@ class ZmqProxy(ZmqBaseReactor):
         try:
             self.register(consumption_proxy,
                           consume_in,
-                          zmq.PULL,
-                          out_bind=True)
+                          zmq.PULL)
         except zmq.ZMQError:
             if os.access(ipc_dir, os.X_OK):
                 with excutils.save_and_reraise_exception():
@@ -559,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor):
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
         LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
-        if sock in self.mapping:
-            LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
-                'data': data})
-            self.mapping[sock].send(data)
-            return
 
         proxy = self.proxies[sock]
 
index f822c715cec28cb1f69e471ef789d2ff29af2ba6..41d58177d9e3c734ae0a95c803a85179b140a89f 100644 (file)
@@ -23,7 +23,7 @@ import contextlib
 import eventlet
 from oslo.config import cfg
 
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import log as logging
 
 
@@ -248,9 +248,7 @@ class DirectBinding(Binding):
     that it maps directly to a host, thus direct.
     """
     def test(self, key):
-        if '.' in key:
-            return True
-        return False
+        return '.' in key
 
 
 class TopicBinding(Binding):
@@ -262,17 +260,13 @@ class TopicBinding(Binding):
     matches that of a direct exchange.
     """
     def test(self, key):
-        if '.' not in key:
-            return True
-        return False
+        return '.' not in key
 
 
 class FanoutBinding(Binding):
     """Match on fanout keys, where key starts with 'fanout.' string."""
     def test(self, key):
-        if key.startswith('fanout~'):
-            return True
-        return False
+        return key.startswith('fanout~')
 
 
 class StubExchange(Exchange):
index 1b623b2a11a691815e269db6e77027e5f96311be..94749371483190fd94b171399d2aa111592bdbb1 100644 (file)
@@ -23,7 +23,7 @@ import json
 
 from oslo.config import cfg
 
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import log as logging
 from heat.openstack.common.rpc import matchmaker as mm
 
@@ -63,9 +63,7 @@ class RingExchange(mm.Exchange):
             self.ring0[k] = itertools.cycle(self.ring[k])
 
     def _ring_has(self, key):
-        if key in self.ring0:
-            return True
-        return False
+        return key in self.ring0
 
 
 class RoundRobinRingExchange(RingExchange):
index 178f40e827a97185ffff5448b4e6f83f4e825474..f2957245da6dc98493536c7464746544f0db6dd4 100644 (file)
@@ -69,7 +69,7 @@ class RpcProxy(object):
         v = vers if vers else self.default_version
         if (self.version_cap and not
                 rpc_common.version_is_compatible(self.version_cap, v)):
-            raise rpc_common.RpcVersionCapError(version=self.version_cap)
+            raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
         msg['version'] = v
 
     def _get_topic(self, topic):
index 7a02bedeefe88c1bc18014e0f2e1e37cd29afaf7..cd983782e50a9e47561d5241befd38e878ec48a2 100644 (file)
@@ -17,7 +17,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import log as logging
 from heat.openstack.common import rpc
 from heat.openstack.common.rpc import dispatcher as rpc_dispatcher
@@ -32,10 +32,11 @@ class Service(service.Service):
 
     A service enables rpc by listening to queues based on topic and host.
     """
-    def __init__(self, host, topic, manager=None):
+    def __init__(self, host, topic, manager=None, serializer=None):
         super(Service, self).__init__()
         self.host = host
         self.topic = topic
+        self.serializer = serializer
         if manager is None:
             self.manager = self
         else:
@@ -48,7 +49,8 @@ class Service(service.Service):
         LOG.debug(_("Creating Consumer connection for Service %s") %
                   self.topic)
 
-        dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
+        dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
+                                                  self.serializer)
 
         # Share this same connection for these Consumers
         self.conn.create_consumer(self.topic, dispatcher, fanout=False)
index 0f61a68da13a8bc1f079ff594fdecfaaac5bbfef..e4a29a8b3ef68e96307ccd106c386d5e36dfcf5e 100644 (file)
@@ -27,11 +27,12 @@ import sys
 import time
 
 import eventlet
+from eventlet import event
 import logging as std_logging
 from oslo.config import cfg
 
 from heat.openstack.common import eventlet_backdoor
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _  # noqa
 from heat.openstack.common import importutils
 from heat.openstack.common import log as logging
 from heat.openstack.common import threadgroup
@@ -51,20 +52,9 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services = threadgroup.ThreadGroup()
+        self.services = Services()
         self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
 
-    @staticmethod
-    def run_service(service):
-        """Start and wait for a service to finish.
-
-        :param service: service to run and wait for.
-        :returns: None
-
-        """
-        service.start()
-        service.wait()
-
     def launch_service(self, service):
         """Load and start the given service.
 
@@ -73,7 +63,7 @@ class Launcher(object):
 
         """
         service.backdoor_port = self.backdoor_port
-        self._services.add_thread(self.run_service, service)
+        self.services.add(service)
 
     def stop(self):
         """Stop all services which are currently running.
@@ -81,7 +71,7 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services.stop()
+        self.services.stop()
 
     def wait(self):
         """Waits until all services have been stopped, and then returns.
@@ -89,7 +79,16 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services.wait()
+        self.services.wait()
+
+    def restart(self):
+        """Reload config files and restart service.
+
+        :returns: None
+
+        """
+        cfg.CONF.reload_config_files()
+        self.services.restart()
 
 
 class SignalExit(SystemExit):
@@ -103,31 +102,51 @@ class ServiceLauncher(Launcher):
         # Allow the process to be killed again and die from natural causes
         signal.signal(signal.SIGTERM, signal.SIG_DFL)
         signal.signal(signal.SIGINT, signal.SIG_DFL)
+        signal.signal(signal.SIGHUP, signal.SIG_DFL)
 
         raise SignalExit(signo)
 
-    def wait(self):
+    def handle_signal(self):
         signal.signal(signal.SIGTERM, self._handle_signal)
         signal.signal(signal.SIGINT, self._handle_signal)
+        signal.signal(signal.SIGHUP, self._handle_signal)
+
+    def _wait_for_exit_or_signal(self):
+        status = None
+        signo = 0
 
         LOG.debug(_('Full set of CONF:'))
         CONF.log_opt_values(LOG, std_logging.DEBUG)
 
-        status = None
         try:
             super(ServiceLauncher, self).wait()
         except SignalExit as exc:
             signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT'}[exc.signo]
+                       signal.SIGINT: 'SIGINT',
+                       signal.SIGHUP: 'SIGHUP'}[exc.signo]
             LOG.info(_('Caught %s, exiting'), signame)
             status = exc.code
+            signo = exc.signo
         except SystemExit as exc:
             status = exc.code
         finally:
-            if rpc:
-                rpc.cleanup()
             self.stop()
-        return status
+            if rpc:
+                try:
+                    rpc.cleanup()
+                except Exception:
+                    # We're shutting down, so it doesn't matter at this point.
+                    LOG.exception(_('Exception during rpc cleanup.'))
+
+        return status, signo
+
+    def wait(self):
+        while True:
+            self.handle_signal()
+            status, signo = self._wait_for_exit_or_signal()
+            if signo != signal.SIGHUP:
+                return status
+            self.restart()
 
 
 class ServiceWrapper(object):
@@ -145,9 +164,12 @@ class ProcessLauncher(object):
         self.running = True
         rfd, self.writepipe = os.pipe()
         self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+        self.handle_signal()
 
+    def handle_signal(self):
         signal.signal(signal.SIGTERM, self._handle_signal)
         signal.signal(signal.SIGINT, self._handle_signal)
+        signal.signal(signal.SIGHUP, self._handle_signal)
 
     def _handle_signal(self, signo, frame):
         self.sigcaught = signo
@@ -156,6 +178,7 @@ class ProcessLauncher(object):
         # Allow the process to be killed again and die from natural causes
         signal.signal(signal.SIGTERM, signal.SIG_DFL)
         signal.signal(signal.SIGINT, signal.SIG_DFL)
+        signal.signal(signal.SIGHUP, signal.SIG_DFL)
 
     def _pipe_watcher(self):
         # This will block until the write end is closed when the parent
@@ -166,16 +189,47 @@ class ProcessLauncher(object):
 
         sys.exit(1)
 
-    def _child_process(self, service):
+    def _child_process_handle_signal(self):
         # Setup child signal handlers differently
         def _sigterm(*args):
             signal.signal(signal.SIGTERM, signal.SIG_DFL)
             raise SignalExit(signal.SIGTERM)
 
+        def _sighup(*args):
+            signal.signal(signal.SIGHUP, signal.SIG_DFL)
+            raise SignalExit(signal.SIGHUP)
+
         signal.signal(signal.SIGTERM, _sigterm)
+        signal.signal(signal.SIGHUP, _sighup)
         # Block SIGINT and let the parent send us a SIGTERM
         signal.signal(signal.SIGINT, signal.SIG_IGN)
 
+    def _child_wait_for_exit_or_signal(self, launcher):
+        status = None
+        signo = 0
+
+        try:
+            launcher.wait()
+        except SignalExit as exc:
+            signame = {signal.SIGTERM: 'SIGTERM',
+                       signal.SIGINT: 'SIGINT',
+                       signal.SIGHUP: 'SIGHUP'}[exc.signo]
+            LOG.info(_('Caught %s, exiting'), signame)
+            status = exc.code
+            signo = exc.signo
+        except SystemExit as exc:
+            status = exc.code
+        except BaseException:
+            LOG.exception(_('Unhandled exception'))
+            status = 2
+        finally:
+            launcher.stop()
+
+        return status, signo
+
+    def _child_process(self, service):
+        self._child_process_handle_signal()
+
         # Reopen the eventlet hub to make sure we don't share an epoll
         # fd with parent and/or siblings, which would be bad
         eventlet.hubs.use_hub()
@@ -189,7 +243,8 @@ class ProcessLauncher(object):
         random.seed()
 
         launcher = Launcher()
-        launcher.run_service(service)
+        launcher.launch_service(service)
+        return launcher
 
     def _start_child(self, wrap):
         if len(wrap.forktimes) > wrap.workers:
@@ -210,21 +265,13 @@ class ProcessLauncher(object):
             # NOTE(johannes): All exceptions are caught to ensure this
             # doesn't fallback into the loop spawning children. It would
             # be bad for a child to spawn more children.
-            status = 0
-            try:
-                self._child_process(wrap.service)
-            except SignalExit as exc:
-                signame = {signal.SIGTERM: 'SIGTERM',
-                           signal.SIGINT: 'SIGINT'}[exc.signo]
-                LOG.info(_('Caught %s, exiting'), signame)
-                status = exc.code
-            except SystemExit as exc:
-                status = exc.code
-            except BaseException:
-                LOG.exception(_('Unhandled exception'))
-                status = 2
-            finally:
-                wrap.service.stop()
+            launcher = self._child_process(wrap.service)
+            while True:
+                self._child_process_handle_signal()
+                status, signo = self._child_wait_for_exit_or_signal(launcher)
+                if signo != signal.SIGHUP:
+                    break
+                launcher.restart()
 
             os._exit(status)
 
@@ -270,12 +317,7 @@ class ProcessLauncher(object):
         wrap.children.remove(pid)
         return wrap
 
-    def wait(self):
-        """Loop waiting on children to die and respawning as necessary."""
-
-        LOG.debug(_('Full set of CONF:'))
-        CONF.log_opt_values(LOG, std_logging.DEBUG)
-
+    def _respawn_children(self):
         while self.running:
             wrap = self._wait_child()
             if not wrap:
@@ -284,14 +326,30 @@ class ProcessLauncher(object):
                 # (see bug #1095346)
                 eventlet.greenthread.sleep(.01)
                 continue
-
             while self.running and len(wrap.children) < wrap.workers:
                 self._start_child(wrap)
 
-        if self.sigcaught:
-            signame = {signal.SIGTERM: 'SIGTERM',
-                       signal.SIGINT: 'SIGINT'}[self.sigcaught]
-            LOG.info(_('Caught %s, stopping children'), signame)
+    def wait(self):
+        """Loop waiting on children to die and respawning as necessary."""
+
+        LOG.debug(_('Full set of CONF:'))
+        CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+        while True:
+            self.handle_signal()
+            self._respawn_children()
+            if self.sigcaught:
+                signame = {signal.SIGTERM: 'SIGTERM',
+                           signal.SIGINT: 'SIGINT',
+                           signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
+                LOG.info(_('Caught %s, stopping children'), signame)
+            if self.sigcaught != signal.SIGHUP:
+                break
+
+            for pid in self.children:
+                os.kill(pid, signal.SIGHUP)
+            self.running = True
+            self.sigcaught = None
 
         for pid in self.children:
             try:
@@ -313,15 +371,74 @@ class Service(object):
     def __init__(self, threads=1000):
         self.tg = threadgroup.ThreadGroup(threads)
 
+        # signal that the service is done shutting itself down:
+        self._done = event.Event()
+
+    def reset(self):
+        # NOTE(Fengqian): docs for Event.reset() recommend against using it
+        self._done = event.Event()
+
     def start(self):
         pass
 
     def stop(self):
         self.tg.stop()
+        self.tg.wait()
+        # Signal that service cleanup is done:
+        if not self._done.ready():
+            self._done.send()
+
+    def wait(self):
+        self._done.wait()
+
+
+class Services(object):
+
+    def __init__(self):
+        self.services = []
+        self.tg = threadgroup.ThreadGroup()
+        self.done = event.Event()
+
+    def add(self, service):
+        self.services.append(service)
+        self.tg.add_thread(self.run_service, service, self.done)
+
+    def stop(self):
+        # wait for graceful shutdown of services:
+        for service in self.services:
+            service.stop()
+            service.wait()
+
+        # Each service has performed cleanup, now signal that the run_service
+        # wrapper threads can now die:
+        if not self.done.ready():
+            self.done.send()
+
+        # reap threads:
+        self.tg.stop()
 
     def wait(self):
         self.tg.wait()
 
+    def restart(self):
+        self.stop()
+        self.done = event.Event()
+        for restart_service in self.services:
+            restart_service.reset()
+            self.tg.add_thread(self.run_service, restart_service, self.done)
+
+    @staticmethod
+    def run_service(service, done):
+        """Service start wrapper.
+
+        :param service: service to run
+        :param done: event to wait on until a shutdown is triggered
+        :returns: None
+
+        """
+        service.start()
+        done.wait()
+
 
 def launch(service, workers=None):
     if workers:
diff --git a/heat/openstack/common/sslutils.py b/heat/openstack/common/sslutils.py
new file mode 100644 (file)
index 0000000..a01258d
--- /dev/null
@@ -0,0 +1,100 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import os
+import ssl
+
+from oslo.config import cfg
+
+from heat.openstack.common.gettextutils import _  # noqa
+
+
+ssl_opts = [
+    cfg.StrOpt('ca_file',
+               default=None,
+               help="CA certificate file to use to verify "
+                    "connecting clients"),
+    cfg.StrOpt('cert_file',
+               default=None,
+               help="Certificate file to use when starting "
+                    "the server securely"),
+    cfg.StrOpt('key_file',
+               default=None,
+               help="Private key file to use when starting "
+                    "the server securely"),
+]
+
+
+CONF = cfg.CONF
+CONF.register_opts(ssl_opts, "ssl")
+
+
+def is_enabled():
+    cert_file = CONF.ssl.cert_file
+    key_file = CONF.ssl.key_file
+    ca_file = CONF.ssl.ca_file
+    use_ssl = cert_file or key_file
+
+    if cert_file and not os.path.exists(cert_file):
+        raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
+
+    if ca_file and not os.path.exists(ca_file):
+        raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
+
+    if key_file and not os.path.exists(key_file):
+        raise RuntimeError(_("Unable to find key_file : %s") % key_file)
+
+    if use_ssl and (not cert_file or not key_file):
+        raise RuntimeError(_("When running server in SSL mode, you must "
+                             "specify both a cert_file and key_file "
+                             "option value in your configuration file"))
+
+    return use_ssl
+
+
+def wrap(sock):
+    ssl_kwargs = {
+        'server_side': True,
+        'certfile': CONF.ssl.cert_file,
+        'keyfile': CONF.ssl.key_file,
+        'cert_reqs': ssl.CERT_NONE,
+    }
+
+    if CONF.ssl.ca_file:
+        ssl_kwargs['ca_certs'] = CONF.ssl.ca_file
+        ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+
+    return ssl.wrap_socket(sock, **ssl_kwargs)
+
+
+_SSL_PROTOCOLS = {
+    "tlsv1": ssl.PROTOCOL_TLSv1,
+    "sslv23": ssl.PROTOCOL_SSLv23,
+    "sslv3": ssl.PROTOCOL_SSLv3
+}
+
+try:
+    _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
+except AttributeError:
+    pass
+
+
+def validate_ssl_version(version):
+    key = version.lower()
+    try:
+        return _SSL_PROTOCOLS[key]
+    except KeyError:
+        raise RuntimeError(_("Invalid SSL version : %s") % version)
index 9387894546d20c803fae6f20b597b965f83344f3..f9bea10789c366b80078767cdb08166073ccf1d0 100644 (file)
@@ -14,7 +14,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from eventlet import greenlet
+import eventlet
 from eventlet import greenpool
 from eventlet import greenthread
 
@@ -105,7 +105,7 @@ class ThreadGroup(object):
         for x in self.timers:
             try:
                 x.wait()
-            except greenlet.GreenletExit:
+            except eventlet.greenlet.GreenletExit:
                 pass
             except Exception as ex:
                 LOG.exception(ex)
@@ -115,7 +115,7 @@ class ThreadGroup(object):
                 continue
             try:
                 x.wait()
-            except greenlet.GreenletExit:
+            except eventlet.greenlet.GreenletExit:
                 pass
             except Exception as ex:
                 LOG.exception(ex)
index bd60489e56f0553aeeda6350f9d965312eb31da9..aa9f70807466116315c9d2478b1a0f2105e0be94 100644 (file)
@@ -49,9 +49,9 @@ def parse_isotime(timestr):
     try:
         return iso8601.parse_date(timestr)
     except iso8601.ParseError as e:
-        raise ValueError(e.message)
+        raise ValueError(unicode(e))
     except TypeError as e:
-        raise ValueError(e.message)
+        raise ValueError(unicode(e))
 
 
 def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
diff --git a/tools/config/generate_sample.sh b/tools/config/generate_sample.sh
new file mode 100755 (executable)
index 0000000..f306a61
--- /dev/null
@@ -0,0 +1,69 @@
+#!/usr/bin/env bash
+
+print_hint() {
+    echo "Try \`${0##*/} --help' for more information." >&2
+}
+
+PARSED_OPTIONS=$(getopt -n "${0##*/}" -o hb:p:o: \
+                 --long help,base-dir:,package-name:,output-dir: -- "$@")
+
+if [ $? != 0 ] ; then print_hint ; exit 1 ; fi
+
+eval set -- "$PARSED_OPTIONS"
+
+while true; do
+    case "$1" in
+        -h|--help)
+            echo "${0##*/} [options]"
+            echo ""
+            echo "options:"
+            echo "-h, --help                show brief help"
+            echo "-b, --base-dir=DIR        Project base directory (required)"
+            echo "-p, --package-name=NAME   Project package name"
+            echo "-o, --output-dir=DIR      File output directory"
+            exit 0
+            ;;
+        -b|--base-dir)
+            shift
+            BASEDIR=`echo $1 | sed -e 's/\/*$//g'`
+            shift
+            ;;
+        -p|--package-name)
+            shift
+            PACKAGENAME=`echo $1`
+            shift
+            ;;
+        -o|--output-dir)
+            shift
+            OUTPUTDIR=`echo $1 | sed -e 's/\/*$//g'`
+            shift
+            ;;
+        --)
+            break
+            ;;
+    esac
+done
+
+if [ -z $BASEDIR ] || ! [ -d $BASEDIR ]
+then
+    echo "${0##*/}: missing project base directory" >&2 ; print_hint ; exit 1
+fi
+
+PACKAGENAME=${PACKAGENAME:-${BASEDIR##*/}}
+
+OUTPUTDIR=${OUTPUTDIR:-$BASEDIR/etc}
+if ! [ -d $OUTPUTDIR ]
+then
+    echo "${0##*/}: cannot access \`$OUTPUTDIR': No such file or directory" >&2
+    exit 1
+fi
+
+BASEDIRESC=`echo $BASEDIR | sed -e 's/\//\\\\\//g'`
+FILES=$(find $BASEDIR/$PACKAGENAME -type f -name "*.py" ! -path "*/tests/*" \
+        -exec grep -l "Opt(" {} + | sed -e "s/^$BASEDIRESC\///g" | sort -u)
+
+export EVENTLET_NO_GREENDNS=yes
+
+MODULEPATH=heat.openstack.common.config.generator
+OUTPUTFILE=$OUTPUTDIR/$PACKAGENAME.conf.sample
+python -m $MODULEPATH $FILES > $OUTPUTFILE
index ff8b5645bbfb598c58493150076c4fec0f0d8e9a..44713c914aa034b11cfe8af74cc1d6ae0689e8c1 100644 (file)
@@ -17,7 +17,7 @@
 import os
 import sys
 
-import install_venv_common as install_venv
+import install_venv_common as install_venv  # noqa
 
 
 def first_file(file_list):