From 00e71c3d7810e2f9e12f57c0a713d2b8939bb70f Mon Sep 17 00:00:00 2001 From: Alexander Gordeev Date: Thu, 1 Aug 2013 17:22:20 +0400 Subject: [PATCH] Update Oslo to 96d1f887dda Part 1 Oslo version 96d1f887dda21b43ba4376187f31953dee6f5273 This commit just injects fresh portion of code from Oslo into Heat and fixes all the issues with new code Partially implements blueprint oslo-db-support Change-Id: I7e98c12ddf6689efc6ea6a4deab0b1c840297730 --- etc/heat/heat.conf.sample | 48 +++- heat/openstack/common/config/generator.py | 35 ++- heat/openstack/common/context.py | 2 +- heat/openstack/common/eventlet_backdoor.py | 63 +++++- heat/openstack/common/loopingcall.py | 2 +- heat/openstack/common/network_utils.py | 20 +- heat/openstack/common/notifier/api.py | 27 +-- heat/openstack/common/rpc/__init__.py | 2 +- heat/openstack/common/rpc/amqp.py | 29 ++- heat/openstack/common/rpc/common.py | 70 +++--- heat/openstack/common/rpc/impl_kombu.py | 98 +++++---- heat/openstack/common/rpc/impl_qpid.py | 58 ++++- heat/openstack/common/rpc/impl_zmq.py | 31 +-- heat/openstack/common/rpc/matchmaker.py | 14 +- heat/openstack/common/rpc/matchmaker_ring.py | 6 +- heat/openstack/common/rpc/proxy.py | 2 +- heat/openstack/common/rpc/service.py | 8 +- heat/openstack/common/service.py | 217 ++++++++++++++----- heat/openstack/common/sslutils.py | 100 +++++++++ heat/openstack/common/threadgroup.py | 6 +- heat/openstack/common/timeutils.py | 4 +- tools/config/generate_sample.sh | 69 ++++++ tools/patch_tox_venv.py | 2 +- 23 files changed, 651 insertions(+), 262 deletions(-) mode change 100755 => 100644 heat/openstack/common/config/generator.py create mode 100644 heat/openstack/common/sslutils.py create mode 100755 tools/config/generate_sample.sh diff --git a/etc/heat/heat.conf.sample b/etc/heat/heat.conf.sample index 95eaf490..fd4baf29 100644 --- a/etc/heat/heat.conf.sample +++ b/etc/heat/heat.conf.sample @@ -128,7 +128,14 @@ # Options defined in heat.openstack.common.eventlet_backdoor # -# port for eventlet backdoor to listen (integer value) +# Enable eventlet backdoor. Acceptable values are 0, , +# and :, where 0 results in listening on a random +# tcp port number; results in listening on the +# specified port number (and not enabling backdoor if that +# port is in use); and : results in listening on +# the smallest unused port number within the specified range +# of port numbers. The chosen port is displayed in the +# service's log file. (string value) #backdoor_port= @@ -292,12 +299,24 @@ #control_exchange=openstack +# +# Options defined in heat.openstack.common.rpc.amqp +# + +# Use durable queues in amqp. (boolean value) +#amqp_durable_queues=false + +# Auto-delete queues in amqp. (boolean value) +#amqp_auto_delete=false + + # # Options defined in heat.openstack.common.rpc.impl_kombu # -# SSL version to use (valid only if SSL enabled) (string -# value) +# SSL version to use (valid only if SSL enabled). valid values +# are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some +# distributions (string value) #kombu_ssl_version= # SSL key file (valid only if SSL enabled) (string value) @@ -346,9 +365,6 @@ # value) #rabbit_max_retries=0 -# use durable queues in RabbitMQ (boolean value) -#rabbit_durable_queues=false - # use H/A queues in RabbitMQ (x-ha-policy: all).You need to # wipe RabbitMQ database when changing this option. (boolean # value) @@ -431,6 +447,25 @@ #matchmaker_heartbeat_ttl=600 +[ssl] + +# +# Options defined in heat.openstack.common.sslutils +# + +# CA certificate file to use to verify connecting clients +# (string value) +#ca_file= + +# Certificate file to use when starting the server securely +# (string value) +#cert_file= + +# Private key file to use when starting the server securely +# (string value) +#key_file= + + [paste_deploy] # @@ -507,4 +542,3 @@ #ringfile=/etc/oslo/matchmaker_ring.json -# Total option count: 109 diff --git a/heat/openstack/common/config/generator.py b/heat/openstack/common/config/generator.py old mode 100755 new mode 100644 index 30c08a83..27e217e8 --- a/heat/openstack/common/config/generator.py +++ b/heat/openstack/common/config/generator.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 SINA Corporation @@ -16,8 +15,6 @@ # License for the specific language governing permissions and limitations # under the License. # -# @author: Zhongyue Luo, SINA Corporation. -# """Extracts OpenStack config option info from module(s).""" @@ -53,7 +50,6 @@ OPT_TYPES = { MULTISTROPT: 'multi valued', } -OPTION_COUNT = 0 OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT, FLOATOPT, LISTOPT, MULTISTROPT])) @@ -100,8 +96,6 @@ def generate(srcfiles): for group, opts in opts_by_group.items(): print_group_opts(group, opts) - print("# Total option count: %d" % OPTION_COUNT) - def _import_module(mod_str): try: @@ -166,9 +160,7 @@ def _list_opts(obj): def print_group_opts(group, opts_by_module): print("[%s]" % group) print('') - global OPTION_COUNT for mod, opts in opts_by_module: - OPTION_COUNT += len(opts) print('#') print('# Options defined in %s' % mod) print('#') @@ -189,24 +181,24 @@ def _get_my_ip(): return None -def _sanitize_default(s): +def _sanitize_default(name, value): """Set up a reasonably sensible default for pybasedir, my_ip and host.""" - if s.startswith(sys.prefix): + if value.startswith(sys.prefix): # NOTE(jd) Don't use os.path.join, because it is likely to think the # second part is an absolute pathname and therefore drop the first # part. - s = os.path.normpath("/usr/" + s[len(sys.prefix):]) - elif s.startswith(BASEDIR): - return s.replace(BASEDIR, '/usr/lib/python/site-packages') - elif BASEDIR in s: - return s.replace(BASEDIR, '') - elif s == _get_my_ip(): + value = os.path.normpath("/usr/" + value[len(sys.prefix):]) + elif value.startswith(BASEDIR): + return value.replace(BASEDIR, '/usr/lib/python/site-packages') + elif BASEDIR in value: + return value.replace(BASEDIR, '') + elif value == _get_my_ip(): return '10.0.0.1' - elif s == socket.gethostname(): + elif value == socket.gethostname() and 'host' in name: return 'heat' - elif s.strip() != s: - return '"%s"' % s - return s + elif value.strip() != value: + return '"%s"' % value + return value def _print_opt(opt): @@ -227,7 +219,8 @@ def _print_opt(opt): print('#%s=' % opt_name) elif opt_type == STROPT: assert(isinstance(opt_default, basestring)) - print('#%s=%s' % (opt_name, _sanitize_default(opt_default))) + print('#%s=%s' % (opt_name, _sanitize_default(opt_name, + opt_default))) elif opt_type == BOOLOPT: assert(isinstance(opt_default, bool)) print('#%s=%s' % (opt_name, str(opt_default).lower())) diff --git a/heat/openstack/common/context.py b/heat/openstack/common/context.py index b125af71..1d9d7eb7 100644 --- a/heat/openstack/common/context.py +++ b/heat/openstack/common/context.py @@ -61,7 +61,7 @@ class RequestContext(object): 'request_id': self.request_id} -def get_admin_context(show_deleted="no"): +def get_admin_context(show_deleted=False): context = RequestContext(None, tenant=None, is_admin=True, diff --git a/heat/openstack/common/eventlet_backdoor.py b/heat/openstack/common/eventlet_backdoor.py index 57b89ae9..7612d55b 100644 --- a/heat/openstack/common/eventlet_backdoor.py +++ b/heat/openstack/common/eventlet_backdoor.py @@ -18,8 +18,11 @@ from __future__ import print_function +import errno import gc +import os import pprint +import socket import sys import traceback @@ -28,14 +31,34 @@ import eventlet.backdoor import greenlet from oslo.config import cfg +from heat.openstack.common.gettextutils import _ # noqa +from heat.openstack.common import log as logging + +help_for_backdoor_port = ( + "Acceptable values are 0, , and :, where 0 results " + "in listening on a random tcp port number; results in listening " + "on the specified port number (and not enabling backdoor if that port " + "is in use); and : results in listening on the smallest " + "unused port number within the specified range of port numbers. The " + "chosen port is displayed in the service's log file.") eventlet_backdoor_opts = [ - cfg.IntOpt('backdoor_port', + cfg.StrOpt('backdoor_port', default=None, - help='port for eventlet backdoor to listen') + help="Enable eventlet backdoor. %s" % help_for_backdoor_port) ] CONF = cfg.CONF CONF.register_opts(eventlet_backdoor_opts) +LOG = logging.getLogger(__name__) + + +class EventletBackdoorConfigValueError(Exception): + def __init__(self, port_range, help_msg, ex): + msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' + '%(help)s' % + {'range': port_range, 'ex': ex, 'help': help_msg}) + super(EventletBackdoorConfigValueError, self).__init__(msg) + self.port_range = port_range def _dont_use_this(): @@ -60,6 +83,33 @@ def _print_nativethreads(): print() +def _parse_port_range(port_range): + if ':' not in port_range: + start, end = port_range, port_range + else: + start, end = port_range.split(':', 1) + try: + start, end = int(start), int(end) + if end < start: + raise ValueError + return start, end + except ValueError as ex: + raise EventletBackdoorConfigValueError(port_range, ex, + help_for_backdoor_port) + + +def _listen(host, start_port, end_port, listen_func): + try_port = start_port + while True: + try: + return listen_func((host, try_port)) + except socket.error as exc: + if (exc.errno != errno.EADDRINUSE or + try_port >= end_port): + raise + try_port += 1 + + def initialize_if_enabled(): backdoor_locals = { 'exit': _dont_use_this, # So we don't exit the entire process @@ -72,6 +122,8 @@ def initialize_if_enabled(): if CONF.backdoor_port is None: return None + start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) + # NOTE(johannes): The standard sys.displayhook will print the value of # the last expression and set it to __builtin__._, which overwrites # the __builtin__._ that gettext sets. Let's switch to using pprint @@ -82,8 +134,13 @@ def initialize_if_enabled(): pprint.pprint(val) sys.displayhook = displayhook - sock = eventlet.listen(('localhost', CONF.backdoor_port)) + sock = _listen('localhost', start_port, end_port, eventlet.listen) + + # In the case of backdoor port being zero, a port number is assigned by + # listen(). In any case, pull the port number out here. port = sock.getsockname()[1] + LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') % + {'port': port, 'pid': os.getpid()}) eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, locals=backdoor_locals) return port diff --git a/heat/openstack/common/loopingcall.py b/heat/openstack/common/loopingcall.py index 0fa60bd9..1cd24842 100644 --- a/heat/openstack/common/loopingcall.py +++ b/heat/openstack/common/loopingcall.py @@ -22,7 +22,7 @@ import sys from eventlet import event from eventlet import greenthread -from heat.openstack.common.gettextutils import _ +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import log as logging from heat.openstack.common import timeutils diff --git a/heat/openstack/common/network_utils.py b/heat/openstack/common/network_utils.py index d049d658..dbed1ceb 100644 --- a/heat/openstack/common/network_utils.py +++ b/heat/openstack/common/network_utils.py @@ -19,10 +19,7 @@ Network-related utilities and helper functions. """ -from heat.openstack.common import log as logging - - -LOG = logging.getLogger(__name__) +import urlparse def parse_host_port(address, default_port=None): @@ -67,3 +64,18 @@ def parse_host_port(address, default_port=None): port = default_port return (host, None if port is None else int(port)) + + +def urlsplit(url, scheme='', allow_fragments=True): + """Parse a URL using urlparse.urlsplit(), splitting query and fragments. + This function papers over Python issue9374 when needed. + + The parameters are the same as urlparse.urlsplit. + """ + scheme, netloc, path, query, fragment = urlparse.urlsplit( + url, scheme, allow_fragments) + if allow_fragments and '#' in path: + path, fragment = path.split('#', 1) + if '?' in path: + path, query = path.split('?', 1) + return urlparse.SplitResult(scheme, netloc, path, query, fragment) diff --git a/heat/openstack/common/notifier/api.py b/heat/openstack/common/notifier/api.py index 428e5068..cd1e7b4b 100644 --- a/heat/openstack/common/notifier/api.py +++ b/heat/openstack/common/notifier/api.py @@ -157,29 +157,16 @@ def _get_drivers(): if _drivers is None: _drivers = {} for notification_driver in CONF.notification_driver: - add_driver(notification_driver) - + try: + driver = importutils.import_module(notification_driver) + _drivers[notification_driver] = driver + except ImportError: + LOG.exception(_("Failed to load notifier %s. " + "These notifications will not be sent.") % + notification_driver) return _drivers.values() -def add_driver(notification_driver): - """Add a notification driver at runtime.""" - # Make sure the driver list is initialized. - _get_drivers() - if isinstance(notification_driver, basestring): - # Load and add - try: - driver = importutils.import_module(notification_driver) - _drivers[notification_driver] = driver - except ImportError: - LOG.exception(_("Failed to load notifier %s. " - "These notifications will not be sent.") % - notification_driver) - else: - # Driver is already loaded; just add the object. - _drivers[notification_driver] = notification_driver - - def _reset_drivers(): """Used by unit tests to reset the drivers.""" global _drivers diff --git a/heat/openstack/common/rpc/__init__.py b/heat/openstack/common/rpc/__init__.py index 6907f726..ab62f93d 100644 --- a/heat/openstack/common/rpc/__init__.py +++ b/heat/openstack/common/rpc/__init__.py @@ -29,7 +29,7 @@ import inspect from oslo.config import cfg -from heat.openstack.common.gettextutils import _ +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import importutils from heat.openstack.common import local from heat.openstack.common import log as logging diff --git a/heat/openstack/common/rpc/amqp.py b/heat/openstack/common/rpc/amqp.py index ac738774..36501c42 100644 --- a/heat/openstack/common/rpc/amqp.py +++ b/heat/openstack/common/rpc/amqp.py @@ -34,14 +34,28 @@ from eventlet import greenpool from eventlet import pools from eventlet import queue from eventlet import semaphore +from oslo.config import cfg from heat.openstack.common import excutils -from heat.openstack.common.gettextutils import _ +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import local from heat.openstack.common import log as logging from heat.openstack.common.rpc import common as rpc_common +amqp_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), +] + +cfg.CONF.register_opts(amqp_opts) + UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -151,11 +165,13 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) - def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + def join_consumer_pool(self, callback, pool_name, topic, exchange_name, + ack_on_error=True): self.connection.join_consumer_pool(callback, pool_name, topic, - exchange_name) + exchange_name, + ack_on_error) def consume_in_thread(self): self.connection.consume_in_thread() @@ -219,12 +235,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, failure = rpc_common.serialize_remote_exception(failure, log_failure) - try: - msg = {'result': reply, 'failure': failure} - except TypeError: - msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} + msg = {'result': reply, 'failure': failure} if ending: msg['ending'] = True _add_unique_id(msg) diff --git a/heat/openstack/common/rpc/common.py b/heat/openstack/common/rpc/common.py index 36516112..bc19c023 100644 --- a/heat/openstack/common/rpc/common.py +++ b/heat/openstack/common/rpc/common.py @@ -24,6 +24,7 @@ import traceback from oslo.config import cfg import six +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import importutils from heat.openstack.common import jsonutils from heat.openstack.common import local @@ -73,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote' class RPCException(Exception): - message = _("An unknown RPC related exception occurred.") + msg_fmt = _("An unknown RPC related exception occurred.") def __init__(self, message=None, **kwargs): self.kwargs = kwargs if not message: try: - message = self.message % kwargs + message = self.msg_fmt % kwargs except Exception: # kwargs doesn't match a variable in the message @@ -89,7 +90,7 @@ class RPCException(Exception): for name, value in kwargs.iteritems(): LOG.error("%s: %s" % (name, value)) # at least get the core message out if something happened - message = self.message + message = self.msg_fmt super(RPCException, self).__init__(message) @@ -103,7 +104,7 @@ class RemoteError(RPCException): contains all of the relevant info. """ - message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") + msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") def __init__(self, exc_type=None, value=None, traceback=None): self.exc_type = exc_type @@ -120,7 +121,7 @@ class Timeout(RPCException): This exception is raised if the rpc_response_timeout is reached while waiting for a response from the remote side. """ - message = _('Timeout while waiting on RPC response - ' + msg_fmt = _('Timeout while waiting on RPC response - ' 'topic: "%(topic)s", RPC method: "%(method)s" ' 'info: "%(info)s"') @@ -143,25 +144,25 @@ class Timeout(RPCException): class DuplicateMessageError(RPCException): - message = _("Found duplicate message(%(msg_id)s). Skipping it.") + msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") class InvalidRPCConnectionReuse(RPCException): - message = _("Invalid reuse of an RPC connection.") + msg_fmt = _("Invalid reuse of an RPC connection.") class UnsupportedRpcVersion(RPCException): - message = _("Specified RPC version, %(version)s, not supported by " + msg_fmt = _("Specified RPC version, %(version)s, not supported by " "this endpoint.") class UnsupportedRpcEnvelopeVersion(RPCException): - message = _("Specified RPC envelope version, %(version)s, " + msg_fmt = _("Specified RPC envelope version, %(version)s, " "not supported by this endpoint.") class RpcVersionCapError(RPCException): - message = _("Specified RPC version cap, %(version_cap)s, is too low") + msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") class Connection(object): @@ -260,41 +261,20 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = {'set_admin_password': [('args', 'new_pass')], - 'run_instance': [('args', 'admin_password')], - 'route_message': [('args', 'message', 'args', 'method_info', - 'method_kwargs', 'password'), - ('args', 'message', 'args', 'method_info', - 'method_kwargs', 'admin_password')]} - - has_method = 'method' in msg_data and msg_data['method'] in SANITIZE - has_context_token = '_context_auth_token' in msg_data - has_token = 'auth_token' in msg_data - - if not any([has_method, has_context_token, has_token]): - return log_func(msg, msg_data) - - msg_data = copy.deepcopy(msg_data) - - if has_method: - for arg in SANITIZE.get(msg_data['method'], []): - try: - d = msg_data - for elem in arg[:-1]: - d = d[elem] - d[arg[-1]] = '' - except KeyError as e: - LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'), - {'item': arg, - 'err': e}) - - if has_context_token: - msg_data['_context_auth_token'] = '' - - if has_token: - msg_data['auth_token'] = '' - - return log_func(msg, msg_data) + SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass'] + + def _fix_passwords(d): + """Sanitizes the password fields in the dictionary.""" + for k in d.iterkeys(): + if k.lower().find('password') != -1: + d[k] = '' + elif k.lower() in SANITIZE: + d[k] = '' + elif isinstance(d[k], dict): + _fix_passwords(d[k]) + return d + + return log_func(msg, _fix_passwords(copy.deepcopy(msg_data))) def serialize_remote_exception(failure_info, log_failure=True): diff --git a/heat/openstack/common/rpc/impl_kombu.py b/heat/openstack/common/rpc/impl_kombu.py index c29007db..b4ab92ba 100644 --- a/heat/openstack/common/rpc/impl_kombu.py +++ b/heat/openstack/common/rpc/impl_kombu.py @@ -18,7 +18,6 @@ import functools import itertools import socket import ssl -import sys import time import uuid @@ -30,15 +29,20 @@ import kombu.entity import kombu.messaging from oslo.config import cfg -from heat.openstack.common.gettextutils import _ +from heat.openstack.common import excutils +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import network_utils from heat.openstack.common.rpc import amqp as rpc_amqp from heat.openstack.common.rpc import common as rpc_common +from heat.openstack.common import sslutils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', default='', - help='SSL version to use (valid only if SSL enabled)'), + help='SSL version to use (valid only if SSL enabled). ' + 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' + 'be available on some distributions' + ), cfg.StrOpt('kombu_ssl_keyfile', default='', help='SSL key file (valid only if SSL enabled)'), @@ -82,9 +86,6 @@ kombu_opts = [ default=0, help='maximum retries with trying to connect to RabbitMQ ' '(the default of 0 implies an infinite retry count)'), - cfg.BoolOpt('rabbit_durable_queues', - default=False, - help='use durable queues in RabbitMQ'), cfg.BoolOpt('rabbit_ha_queues', default=False, help='use H/A queues in RabbitMQ (x-ha-policy: all).' @@ -129,6 +130,7 @@ class ConsumerBase(object): self.tag = str(tag) self.kwargs = kwargs self.queue = None + self.ack_on_error = kwargs.get('ack_on_error', True) self.reconnect(channel) def reconnect(self, channel): @@ -138,6 +140,36 @@ class ConsumerBase(object): self.queue = kombu.entity.Queue(**self.kwargs) self.queue.declare() + def _callback_handler(self, message, callback): + """Call callback with deserialized message. + + Messages that are processed without exception are ack'ed. + + If the message processing generates an exception, it will be + ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed. + Rejection is better than waiting for the message to timeout. + Rejected messages are immediately requeued. + """ + + ack_msg = False + try: + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) + ack_msg = True + except Exception: + if self.ack_on_error: + ack_msg = True + LOG.exception(_("Failed to process message" + " ... skipping it.")) + else: + LOG.exception(_("Failed to process message" + " ... will requeue.")) + finally: + if ack_msg: + message.ack() + else: + message.reject() + def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will start the flow of messages from the queue. Using the @@ -150,8 +182,6 @@ class ConsumerBase(object): If kwargs['nowait'] is True, then this call will block until a message is read. - Messages will automatically be acked if the callback doesn't - raise an exception """ options = {'consumer_tag': self.tag} @@ -162,13 +192,7 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) - try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) - except Exception: - LOG.exception(_("Failed to process message... skipping it.")) - finally: - message.ack() + self._callback_handler(message, callback) self.queue.consume(*args, callback=_callback, **options) @@ -233,9 +257,9 @@ class TopicConsumer(ConsumerBase): Other kombu options may be passed as keyword arguments """ # Default options - options = {'durable': conf.rabbit_durable_queues, + options = {'durable': conf.amqp_durable_queues, 'queue_arguments': _get_queue_arguments(conf), - 'auto_delete': False, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) @@ -339,8 +363,8 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ - options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, + options = {'durable': conf.amqp_durable_queues, + 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} options.update(kwargs) exchange_name = rpc_amqp.get_control_exchange(conf) @@ -370,7 +394,7 @@ class NotifyPublisher(TopicPublisher): """Publisher class for 'notify'.""" def __init__(self, conf, channel, topic, **kwargs): - self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) @@ -454,7 +478,8 @@ class Connection(object): # http://docs.python.org/library/ssl.html - ssl.wrap_socket if self.conf.kombu_ssl_version: - ssl_params['ssl_version'] = self.conf.kombu_ssl_version + ssl_params['ssl_version'] = sslutils.validate_ssl_version( + self.conf.kombu_ssl_version) if self.conf.kombu_ssl_keyfile: ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile if self.conf.kombu_ssl_certfile: @@ -465,12 +490,8 @@ class Connection(object): # future with this? ssl_params['cert_reqs'] = ssl.CERT_REQUIRED - if not ssl_params: - # Just have the default behavior - return True - else: - # Return the extended behavior - return ssl_params + # Return the extended behavior or just have the default behavior + return ssl_params or True def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have @@ -537,13 +558,11 @@ class Connection(object): log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.error(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) - # NOTE(comstud): Copied from original code. There's - # really no better recourse because if this was a queue we - # need to consume on, we have no way to consume anymore. - sys.exit(1) + msg = _('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info + LOG.error(msg) + raise rpc_common.RPCException(msg) if attempt == 1: sleep_time = self.interval_start or 1 @@ -635,8 +654,8 @@ class Connection(object): def _consume(): if info['do_consume']: - queues_head = self.consumers[:-1] - queues_tail = self.consumers[-1] + queues_head = self.consumers[:-1] # not fanout. + queues_tail = self.consumers[-1] # fanout for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) @@ -685,11 +704,12 @@ class Connection(object): self.declare_consumer(DirectConsumer, topic, callback) def declare_topic_consumer(self, topic, callback=None, queue_name=None, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, exchange_name=exchange_name, + ack_on_error=ack_on_error, ), topic, callback) @@ -724,6 +744,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() @@ -754,7 +775,7 @@ class Connection(object): self.declare_topic_consumer(topic, proxy_cb, pool_name) def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. @@ -775,6 +796,7 @@ class Connection(object): topic=topic, exchange_name=exchange_name, callback=callback_wrapper, + ack_on_error=ack_on_error, ) diff --git a/heat/openstack/common/rpc/impl_qpid.py b/heat/openstack/common/rpc/impl_qpid.py index b98d84bd..c3fc593b 100644 --- a/heat/openstack/common/rpc/impl_qpid.py +++ b/heat/openstack/common/rpc/impl_qpid.py @@ -24,7 +24,8 @@ import eventlet import greenlet from oslo.config import cfg -from heat.openstack.common.gettextutils import _ +from heat.openstack.common import excutils +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import importutils from heat.openstack.common import jsonutils from heat.openstack.common import log as logging @@ -118,10 +119,17 @@ class ConsumerBase(object): self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) - self.reconnect(session) + self.connect(session) + + def connect(self, session): + """Declare the reciever on connect.""" + self._declare_receiver(session) def reconnect(self, session): """Re-declare the receiver after a qpid reconnect.""" + self._declare_receiver(session) + + def _declare_receiver(self, session): self.session = session self.receiver = session.receiver(self.address) self.receiver.capacity = 1 @@ -152,11 +160,15 @@ class ConsumerBase(object): except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: + # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) def get_receiver(self): return self.receiver + def get_node_name(self): + return self.address.split(';')[0] + class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'.""" @@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + super(DirectConsumer, self).__init__( + session, callback, + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + { + "auto-delete": conf.amqp_auto_delete, + "exclusive": True, + "durable": conf.amqp_durable_queues, + }) class TopicConsumer(ConsumerBase): @@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase): """ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) - super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (exchange_name, topic), - {}, name or topic, {}) + super(TopicConsumer, self).__init__( + session, callback, + "%s/%s" % (exchange_name, topic), + {}, name or topic, + { + "auto-delete": conf.amqp_auto_delete, + "durable": conf.amqp_durable_queues, + }) class FanoutConsumer(ConsumerBase): @@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase): 'topic' is the topic to listen on 'callback' is the callback to call when messages are received """ + self.conf = conf super(FanoutConsumer, self).__init__( session, callback, @@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase): "%s_fanout_%s" % (topic, uuid.uuid4().hex), {"exclusive": True}) + def reconnect(self, session): + topic = self.get_node_name().rpartition('_fanout')[0] + params = { + 'session': session, + 'topic': topic, + 'callback': self.callback, + } + + self.__init__(conf=self.conf, **params) + + super(FanoutConsumer, self).reconnect(session) + class Publisher(object): """Base Publisher class.""" @@ -575,6 +610,7 @@ class Connection(object): def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions def _consumer_thread(): try: self.consume() @@ -615,7 +651,7 @@ class Connection(object): return consumer def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. diff --git a/heat/openstack/common/rpc/impl_zmq.py b/heat/openstack/common/rpc/impl_zmq.py index 1a2f4b3e..44c1267d 100644 --- a/heat/openstack/common/rpc/impl_zmq.py +++ b/heat/openstack/common/rpc/impl_zmq.py @@ -27,7 +27,7 @@ import greenlet from oslo.config import cfg from heat.openstack.common import excutils -from heat.openstack.common.gettextutils import _ +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import importutils from heat.openstack.common import jsonutils from heat.openstack.common.rpc import common as rpc_common @@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase): def __init__(self, conf): super(ZmqBaseReactor, self).__init__() - self.mapping = {} self.proxies = {} self.threads = [] self.sockets = [] @@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase): self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) - def register(self, proxy, in_addr, zmq_type_in, out_addr=None, - zmq_type_out=None, in_bind=True, out_bind=True, - subscribe=None): + def register(self, proxy, in_addr, zmq_type_in, + in_bind=True, subscribe=None): LOG.info(_("Registering reactor")) @@ -384,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase): LOG.info(_("In reactor registered")) - if not out_addr: - return - - if zmq_type_out not in (zmq.PUSH, zmq.PUB): - raise RPCException("Bad output socktype") - - # Items push out. - outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) - - self.mapping[inq] = outq - self.mapping[outq] = inq - self.sockets.append(outq) - - LOG.info(_("Out reactor registered")) - def consume_in_thread(self): def _consume(sock): LOG.info(_("Consuming socket")) @@ -516,8 +499,7 @@ class ZmqProxy(ZmqBaseReactor): try: self.register(consumption_proxy, consume_in, - zmq.PULL, - out_bind=True) + zmq.PULL) except zmq.ZMQError: if os.access(ipc_dir, os.X_OK): with excutils.save_and_reraise_exception(): @@ -559,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) - if sock in self.mapping: - LOG.debug(_("ROUTER RELAY-OUT %(data)s") % { - 'data': data}) - self.mapping[sock].send(data) - return proxy = self.proxies[sock] diff --git a/heat/openstack/common/rpc/matchmaker.py b/heat/openstack/common/rpc/matchmaker.py index f822c715..41d58177 100644 --- a/heat/openstack/common/rpc/matchmaker.py +++ b/heat/openstack/common/rpc/matchmaker.py @@ -23,7 +23,7 @@ import contextlib import eventlet from oslo.config import cfg -from heat.openstack.common.gettextutils import _ +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import log as logging @@ -248,9 +248,7 @@ class DirectBinding(Binding): that it maps directly to a host, thus direct. """ def test(self, key): - if '.' in key: - return True - return False + return '.' in key class TopicBinding(Binding): @@ -262,17 +260,13 @@ class TopicBinding(Binding): matches that of a direct exchange. """ def test(self, key): - if '.' not in key: - return True - return False + return '.' not in key class FanoutBinding(Binding): """Match on fanout keys, where key starts with 'fanout.' string.""" def test(self, key): - if key.startswith('fanout~'): - return True - return False + return key.startswith('fanout~') class StubExchange(Exchange): diff --git a/heat/openstack/common/rpc/matchmaker_ring.py b/heat/openstack/common/rpc/matchmaker_ring.py index 1b623b2a..94749371 100644 --- a/heat/openstack/common/rpc/matchmaker_ring.py +++ b/heat/openstack/common/rpc/matchmaker_ring.py @@ -23,7 +23,7 @@ import json from oslo.config import cfg -from heat.openstack.common.gettextutils import _ +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import log as logging from heat.openstack.common.rpc import matchmaker as mm @@ -63,9 +63,7 @@ class RingExchange(mm.Exchange): self.ring0[k] = itertools.cycle(self.ring[k]) def _ring_has(self, key): - if key in self.ring0: - return True - return False + return key in self.ring0 class RoundRobinRingExchange(RingExchange): diff --git a/heat/openstack/common/rpc/proxy.py b/heat/openstack/common/rpc/proxy.py index 178f40e8..f2957245 100644 --- a/heat/openstack/common/rpc/proxy.py +++ b/heat/openstack/common/rpc/proxy.py @@ -69,7 +69,7 @@ class RpcProxy(object): v = vers if vers else self.default_version if (self.version_cap and not rpc_common.version_is_compatible(self.version_cap, v)): - raise rpc_common.RpcVersionCapError(version=self.version_cap) + raise rpc_common.RpcVersionCapError(version_cap=self.version_cap) msg['version'] = v def _get_topic(self, topic): diff --git a/heat/openstack/common/rpc/service.py b/heat/openstack/common/rpc/service.py index 7a02bede..cd983782 100644 --- a/heat/openstack/common/rpc/service.py +++ b/heat/openstack/common/rpc/service.py @@ -17,7 +17,7 @@ # License for the specific language governing permissions and limitations # under the License. -from heat.openstack.common.gettextutils import _ +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import log as logging from heat.openstack.common import rpc from heat.openstack.common.rpc import dispatcher as rpc_dispatcher @@ -32,10 +32,11 @@ class Service(service.Service): A service enables rpc by listening to queues based on topic and host. """ - def __init__(self, host, topic, manager=None): + def __init__(self, host, topic, manager=None, serializer=None): super(Service, self).__init__() self.host = host self.topic = topic + self.serializer = serializer if manager is None: self.manager = self else: @@ -48,7 +49,8 @@ class Service(service.Service): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) - dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], + self.serializer) # Share this same connection for these Consumers self.conn.create_consumer(self.topic, dispatcher, fanout=False) diff --git a/heat/openstack/common/service.py b/heat/openstack/common/service.py index 0f61a68d..e4a29a8b 100644 --- a/heat/openstack/common/service.py +++ b/heat/openstack/common/service.py @@ -27,11 +27,12 @@ import sys import time import eventlet +from eventlet import event import logging as std_logging from oslo.config import cfg from heat.openstack.common import eventlet_backdoor -from heat.openstack.common.gettextutils import _ +from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common import importutils from heat.openstack.common import log as logging from heat.openstack.common import threadgroup @@ -51,20 +52,9 @@ class Launcher(object): :returns: None """ - self._services = threadgroup.ThreadGroup() + self.services = Services() self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - @staticmethod - def run_service(service): - """Start and wait for a service to finish. - - :param service: service to run and wait for. - :returns: None - - """ - service.start() - service.wait() - def launch_service(self, service): """Load and start the given service. @@ -73,7 +63,7 @@ class Launcher(object): """ service.backdoor_port = self.backdoor_port - self._services.add_thread(self.run_service, service) + self.services.add(service) def stop(self): """Stop all services which are currently running. @@ -81,7 +71,7 @@ class Launcher(object): :returns: None """ - self._services.stop() + self.services.stop() def wait(self): """Waits until all services have been stopped, and then returns. @@ -89,7 +79,16 @@ class Launcher(object): :returns: None """ - self._services.wait() + self.services.wait() + + def restart(self): + """Reload config files and restart service. + + :returns: None + + """ + cfg.CONF.reload_config_files() + self.services.restart() class SignalExit(SystemExit): @@ -103,31 +102,51 @@ class ServiceLauncher(Launcher): # Allow the process to be killed again and die from natural causes signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGHUP, signal.SIG_DFL) raise SignalExit(signo) - def wait(self): + def handle_signal(self): signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) + signal.signal(signal.SIGHUP, self._handle_signal) + + def _wait_for_exit_or_signal(self): + status = None + signo = 0 LOG.debug(_('Full set of CONF:')) CONF.log_opt_values(LOG, std_logging.DEBUG) - status = None try: super(ServiceLauncher, self).wait() except SignalExit as exc: signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] + signal.SIGINT: 'SIGINT', + signal.SIGHUP: 'SIGHUP'}[exc.signo] LOG.info(_('Caught %s, exiting'), signame) status = exc.code + signo = exc.signo except SystemExit as exc: status = exc.code finally: - if rpc: - rpc.cleanup() self.stop() - return status + if rpc: + try: + rpc.cleanup() + except Exception: + # We're shutting down, so it doesn't matter at this point. + LOG.exception(_('Exception during rpc cleanup.')) + + return status, signo + + def wait(self): + while True: + self.handle_signal() + status, signo = self._wait_for_exit_or_signal() + if signo != signal.SIGHUP: + return status + self.restart() class ServiceWrapper(object): @@ -145,9 +164,12 @@ class ProcessLauncher(object): self.running = True rfd, self.writepipe = os.pipe() self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + self.handle_signal() + def handle_signal(self): signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) + signal.signal(signal.SIGHUP, self._handle_signal) def _handle_signal(self, signo, frame): self.sigcaught = signo @@ -156,6 +178,7 @@ class ProcessLauncher(object): # Allow the process to be killed again and die from natural causes signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGHUP, signal.SIG_DFL) def _pipe_watcher(self): # This will block until the write end is closed when the parent @@ -166,16 +189,47 @@ class ProcessLauncher(object): sys.exit(1) - def _child_process(self, service): + def _child_process_handle_signal(self): # Setup child signal handlers differently def _sigterm(*args): signal.signal(signal.SIGTERM, signal.SIG_DFL) raise SignalExit(signal.SIGTERM) + def _sighup(*args): + signal.signal(signal.SIGHUP, signal.SIG_DFL) + raise SignalExit(signal.SIGHUP) + signal.signal(signal.SIGTERM, _sigterm) + signal.signal(signal.SIGHUP, _sighup) # Block SIGINT and let the parent send us a SIGTERM signal.signal(signal.SIGINT, signal.SIG_IGN) + def _child_wait_for_exit_or_signal(self, launcher): + status = None + signo = 0 + + try: + launcher.wait() + except SignalExit as exc: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT', + signal.SIGHUP: 'SIGHUP'}[exc.signo] + LOG.info(_('Caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_('Unhandled exception')) + status = 2 + finally: + launcher.stop() + + return status, signo + + def _child_process(self, service): + self._child_process_handle_signal() + # Reopen the eventlet hub to make sure we don't share an epoll # fd with parent and/or siblings, which would be bad eventlet.hubs.use_hub() @@ -189,7 +243,8 @@ class ProcessLauncher(object): random.seed() launcher = Launcher() - launcher.run_service(service) + launcher.launch_service(service) + return launcher def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: @@ -210,21 +265,13 @@ class ProcessLauncher(object): # NOTE(johannes): All exceptions are caught to ensure this # doesn't fallback into the loop spawning children. It would # be bad for a child to spawn more children. - status = 0 - try: - self._child_process(wrap.service) - except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] - LOG.info(_('Caught %s, exiting'), signame) - status = exc.code - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_('Unhandled exception')) - status = 2 - finally: - wrap.service.stop() + launcher = self._child_process(wrap.service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal(launcher) + if signo != signal.SIGHUP: + break + launcher.restart() os._exit(status) @@ -270,12 +317,7 @@ class ProcessLauncher(object): wrap.children.remove(pid) return wrap - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - - LOG.debug(_('Full set of CONF:')) - CONF.log_opt_values(LOG, std_logging.DEBUG) - + def _respawn_children(self): while self.running: wrap = self._wait_child() if not wrap: @@ -284,14 +326,30 @@ class ProcessLauncher(object): # (see bug #1095346) eventlet.greenthread.sleep(.01) continue - while self.running and len(wrap.children) < wrap.workers: self._start_child(wrap) - if self.sigcaught: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[self.sigcaught] - LOG.info(_('Caught %s, stopping children'), signame) + def wait(self): + """Loop waiting on children to die and respawning as necessary.""" + + LOG.debug(_('Full set of CONF:')) + CONF.log_opt_values(LOG, std_logging.DEBUG) + + while True: + self.handle_signal() + self._respawn_children() + if self.sigcaught: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT', + signal.SIGHUP: 'SIGHUP'}[self.sigcaught] + LOG.info(_('Caught %s, stopping children'), signame) + if self.sigcaught != signal.SIGHUP: + break + + for pid in self.children: + os.kill(pid, signal.SIGHUP) + self.running = True + self.sigcaught = None for pid in self.children: try: @@ -313,15 +371,74 @@ class Service(object): def __init__(self, threads=1000): self.tg = threadgroup.ThreadGroup(threads) + # signal that the service is done shutting itself down: + self._done = event.Event() + + def reset(self): + # NOTE(Fengqian): docs for Event.reset() recommend against using it + self._done = event.Event() + def start(self): pass def stop(self): self.tg.stop() + self.tg.wait() + # Signal that service cleanup is done: + if not self._done.ready(): + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # Each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + if not self.done.ready(): + self.done.send() + + # reap threads: + self.tg.stop() def wait(self): self.tg.wait() + def restart(self): + self.stop() + self.done = event.Event() + for restart_service in self.services: + restart_service.reset() + self.tg.add_thread(self.run_service, restart_service, self.done) + + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + def launch(service, workers=None): if workers: diff --git a/heat/openstack/common/sslutils.py b/heat/openstack/common/sslutils.py new file mode 100644 index 00000000..a01258d0 --- /dev/null +++ b/heat/openstack/common/sslutils.py @@ -0,0 +1,100 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import ssl + +from oslo.config import cfg + +from heat.openstack.common.gettextutils import _ # noqa + + +ssl_opts = [ + cfg.StrOpt('ca_file', + default=None, + help="CA certificate file to use to verify " + "connecting clients"), + cfg.StrOpt('cert_file', + default=None, + help="Certificate file to use when starting " + "the server securely"), + cfg.StrOpt('key_file', + default=None, + help="Private key file to use when starting " + "the server securely"), +] + + +CONF = cfg.CONF +CONF.register_opts(ssl_opts, "ssl") + + +def is_enabled(): + cert_file = CONF.ssl.cert_file + key_file = CONF.ssl.key_file + ca_file = CONF.ssl.ca_file + use_ssl = cert_file or key_file + + if cert_file and not os.path.exists(cert_file): + raise RuntimeError(_("Unable to find cert_file : %s") % cert_file) + + if ca_file and not os.path.exists(ca_file): + raise RuntimeError(_("Unable to find ca_file : %s") % ca_file) + + if key_file and not os.path.exists(key_file): + raise RuntimeError(_("Unable to find key_file : %s") % key_file) + + if use_ssl and (not cert_file or not key_file): + raise RuntimeError(_("When running server in SSL mode, you must " + "specify both a cert_file and key_file " + "option value in your configuration file")) + + return use_ssl + + +def wrap(sock): + ssl_kwargs = { + 'server_side': True, + 'certfile': CONF.ssl.cert_file, + 'keyfile': CONF.ssl.key_file, + 'cert_reqs': ssl.CERT_NONE, + } + + if CONF.ssl.ca_file: + ssl_kwargs['ca_certs'] = CONF.ssl.ca_file + ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED + + return ssl.wrap_socket(sock, **ssl_kwargs) + + +_SSL_PROTOCOLS = { + "tlsv1": ssl.PROTOCOL_TLSv1, + "sslv23": ssl.PROTOCOL_SSLv23, + "sslv3": ssl.PROTOCOL_SSLv3 +} + +try: + _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 +except AttributeError: + pass + + +def validate_ssl_version(version): + key = version.lower() + try: + return _SSL_PROTOCOLS[key] + except KeyError: + raise RuntimeError(_("Invalid SSL version : %s") % version) diff --git a/heat/openstack/common/threadgroup.py b/heat/openstack/common/threadgroup.py index 93878945..f9bea107 100644 --- a/heat/openstack/common/threadgroup.py +++ b/heat/openstack/common/threadgroup.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from eventlet import greenlet +import eventlet from eventlet import greenpool from eventlet import greenthread @@ -105,7 +105,7 @@ class ThreadGroup(object): for x in self.timers: try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) @@ -115,7 +115,7 @@ class ThreadGroup(object): continue try: x.wait() - except greenlet.GreenletExit: + except eventlet.greenlet.GreenletExit: pass except Exception as ex: LOG.exception(ex) diff --git a/heat/openstack/common/timeutils.py b/heat/openstack/common/timeutils.py index bd60489e..aa9f7080 100644 --- a/heat/openstack/common/timeutils.py +++ b/heat/openstack/common/timeutils.py @@ -49,9 +49,9 @@ def parse_isotime(timestr): try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: - raise ValueError(e.message) + raise ValueError(unicode(e)) except TypeError as e: - raise ValueError(e.message) + raise ValueError(unicode(e)) def strtime(at=None, fmt=PERFECT_TIME_FORMAT): diff --git a/tools/config/generate_sample.sh b/tools/config/generate_sample.sh new file mode 100755 index 00000000..f306a610 --- /dev/null +++ b/tools/config/generate_sample.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +print_hint() { + echo "Try \`${0##*/} --help' for more information." >&2 +} + +PARSED_OPTIONS=$(getopt -n "${0##*/}" -o hb:p:o: \ + --long help,base-dir:,package-name:,output-dir: -- "$@") + +if [ $? != 0 ] ; then print_hint ; exit 1 ; fi + +eval set -- "$PARSED_OPTIONS" + +while true; do + case "$1" in + -h|--help) + echo "${0##*/} [options]" + echo "" + echo "options:" + echo "-h, --help show brief help" + echo "-b, --base-dir=DIR Project base directory (required)" + echo "-p, --package-name=NAME Project package name" + echo "-o, --output-dir=DIR File output directory" + exit 0 + ;; + -b|--base-dir) + shift + BASEDIR=`echo $1 | sed -e 's/\/*$//g'` + shift + ;; + -p|--package-name) + shift + PACKAGENAME=`echo $1` + shift + ;; + -o|--output-dir) + shift + OUTPUTDIR=`echo $1 | sed -e 's/\/*$//g'` + shift + ;; + --) + break + ;; + esac +done + +if [ -z $BASEDIR ] || ! [ -d $BASEDIR ] +then + echo "${0##*/}: missing project base directory" >&2 ; print_hint ; exit 1 +fi + +PACKAGENAME=${PACKAGENAME:-${BASEDIR##*/}} + +OUTPUTDIR=${OUTPUTDIR:-$BASEDIR/etc} +if ! [ -d $OUTPUTDIR ] +then + echo "${0##*/}: cannot access \`$OUTPUTDIR': No such file or directory" >&2 + exit 1 +fi + +BASEDIRESC=`echo $BASEDIR | sed -e 's/\//\\\\\//g'` +FILES=$(find $BASEDIR/$PACKAGENAME -type f -name "*.py" ! -path "*/tests/*" \ + -exec grep -l "Opt(" {} + | sed -e "s/^$BASEDIRESC\///g" | sort -u) + +export EVENTLET_NO_GREENDNS=yes + +MODULEPATH=heat.openstack.common.config.generator +OUTPUTFILE=$OUTPUTDIR/$PACKAGENAME.conf.sample +python -m $MODULEPATH $FILES > $OUTPUTFILE diff --git a/tools/patch_tox_venv.py b/tools/patch_tox_venv.py index ff8b5645..44713c91 100644 --- a/tools/patch_tox_venv.py +++ b/tools/patch_tox_venv.py @@ -17,7 +17,7 @@ import os import sys -import install_venv_common as install_venv +import install_venv_common as install_venv # noqa def first_file(file_list): -- 2.45.2