# Options defined in heat.openstack.common.eventlet_backdoor
#
-# port for eventlet backdoor to listen (integer value)
+# Enable eventlet backdoor. Acceptable values are 0, <port>,
+# and <start>:<end>, where 0 results in listening on a random
+# tcp port number; <port> results in listening on the
+# specified port number (and not enabling backdoor if that
+# port is in use); and <start>:<end> results in listening on
+# the smallest unused port number within the specified range
+# of port numbers. The chosen port is displayed in the
+# service's log file. (string value)
#backdoor_port=<None>
#control_exchange=openstack
+#
+# Options defined in heat.openstack.common.rpc.amqp
+#
+
+# Use durable queues in amqp. (boolean value)
+#amqp_durable_queues=false
+
+# Auto-delete queues in amqp. (boolean value)
+#amqp_auto_delete=false
+
+
#
# Options defined in heat.openstack.common.rpc.impl_kombu
#
-# SSL version to use (valid only if SSL enabled) (string
-# value)
+# SSL version to use (valid only if SSL enabled). valid values
+# are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
+# distributions (string value)
#kombu_ssl_version=
# SSL key file (valid only if SSL enabled) (string value)
# value)
#rabbit_max_retries=0
-# use durable queues in RabbitMQ (boolean value)
-#rabbit_durable_queues=false
-
# use H/A queues in RabbitMQ (x-ha-policy: all).You need to
# wipe RabbitMQ database when changing this option. (boolean
# value)
#matchmaker_heartbeat_ttl=600
+[ssl]
+
+#
+# Options defined in heat.openstack.common.sslutils
+#
+
+# CA certificate file to use to verify connecting clients
+# (string value)
+#ca_file=<None>
+
+# Certificate file to use when starting the server securely
+# (string value)
+#cert_file=<None>
+
+# Private key file to use when starting the server securely
+# (string value)
+#key_file=<None>
+
+
[paste_deploy]
#
#ringfile=/etc/oslo/matchmaker_ring.json
-# Total option count: 109
-#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 SINA Corporation
# License for the specific language governing permissions and limitations
# under the License.
#
-# @author: Zhongyue Luo, SINA Corporation.
-#
"""Extracts OpenStack config option info from module(s)."""
MULTISTROPT: 'multi valued',
}
-OPTION_COUNT = 0
OPTION_REGEX = re.compile(r"(%s)" % "|".join([STROPT, BOOLOPT, INTOPT,
FLOATOPT, LISTOPT,
MULTISTROPT]))
for group, opts in opts_by_group.items():
print_group_opts(group, opts)
- print("# Total option count: %d" % OPTION_COUNT)
-
def _import_module(mod_str):
try:
def print_group_opts(group, opts_by_module):
print("[%s]" % group)
print('')
- global OPTION_COUNT
for mod, opts in opts_by_module:
- OPTION_COUNT += len(opts)
print('#')
print('# Options defined in %s' % mod)
print('#')
return None
-def _sanitize_default(s):
+def _sanitize_default(name, value):
"""Set up a reasonably sensible default for pybasedir, my_ip and host."""
- if s.startswith(sys.prefix):
+ if value.startswith(sys.prefix):
# NOTE(jd) Don't use os.path.join, because it is likely to think the
# second part is an absolute pathname and therefore drop the first
# part.
- s = os.path.normpath("/usr/" + s[len(sys.prefix):])
- elif s.startswith(BASEDIR):
- return s.replace(BASEDIR, '/usr/lib/python/site-packages')
- elif BASEDIR in s:
- return s.replace(BASEDIR, '')
- elif s == _get_my_ip():
+ value = os.path.normpath("/usr/" + value[len(sys.prefix):])
+ elif value.startswith(BASEDIR):
+ return value.replace(BASEDIR, '/usr/lib/python/site-packages')
+ elif BASEDIR in value:
+ return value.replace(BASEDIR, '')
+ elif value == _get_my_ip():
return '10.0.0.1'
- elif s == socket.gethostname():
+ elif value == socket.gethostname() and 'host' in name:
return 'heat'
- elif s.strip() != s:
- return '"%s"' % s
- return s
+ elif value.strip() != value:
+ return '"%s"' % value
+ return value
def _print_opt(opt):
print('#%s=<None>' % opt_name)
elif opt_type == STROPT:
assert(isinstance(opt_default, basestring))
- print('#%s=%s' % (opt_name, _sanitize_default(opt_default)))
+ print('#%s=%s' % (opt_name, _sanitize_default(opt_name,
+ opt_default)))
elif opt_type == BOOLOPT:
assert(isinstance(opt_default, bool))
print('#%s=%s' % (opt_name, str(opt_default).lower()))
'request_id': self.request_id}
-def get_admin_context(show_deleted="no"):
+def get_admin_context(show_deleted=False):
context = RequestContext(None,
tenant=None,
is_admin=True,
from __future__ import print_function
+import errno
import gc
+import os
import pprint
+import socket
import sys
import traceback
import greenlet
from oslo.config import cfg
+from heat.openstack.common.gettextutils import _ # noqa
+from heat.openstack.common import log as logging
+
+help_for_backdoor_port = (
+ "Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
+ "in listening on a random tcp port number; <port> results in listening "
+ "on the specified port number (and not enabling backdoor if that port "
+ "is in use); and <start>:<end> results in listening on the smallest "
+ "unused port number within the specified range of port numbers. The "
+ "chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [
- cfg.IntOpt('backdoor_port',
+ cfg.StrOpt('backdoor_port',
default=None,
- help='port for eventlet backdoor to listen')
+ help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
]
CONF = cfg.CONF
CONF.register_opts(eventlet_backdoor_opts)
+LOG = logging.getLogger(__name__)
+
+
+class EventletBackdoorConfigValueError(Exception):
+ def __init__(self, port_range, help_msg, ex):
+ msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
+ '%(help)s' %
+ {'range': port_range, 'ex': ex, 'help': help_msg})
+ super(EventletBackdoorConfigValueError, self).__init__(msg)
+ self.port_range = port_range
def _dont_use_this():
print()
+def _parse_port_range(port_range):
+ if ':' not in port_range:
+ start, end = port_range, port_range
+ else:
+ start, end = port_range.split(':', 1)
+ try:
+ start, end = int(start), int(end)
+ if end < start:
+ raise ValueError
+ return start, end
+ except ValueError as ex:
+ raise EventletBackdoorConfigValueError(port_range, ex,
+ help_for_backdoor_port)
+
+
+def _listen(host, start_port, end_port, listen_func):
+ try_port = start_port
+ while True:
+ try:
+ return listen_func((host, try_port))
+ except socket.error as exc:
+ if (exc.errno != errno.EADDRINUSE or
+ try_port >= end_port):
+ raise
+ try_port += 1
+
+
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
if CONF.backdoor_port is None:
return None
+ start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
+
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
# the __builtin__._ that gettext sets. Let's switch to using pprint
pprint.pprint(val)
sys.displayhook = displayhook
- sock = eventlet.listen(('localhost', CONF.backdoor_port))
+ sock = _listen('localhost', start_port, end_port, eventlet.listen)
+
+ # In the case of backdoor port being zero, a port number is assigned by
+ # listen(). In any case, pull the port number out here.
port = sock.getsockname()[1]
+ LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
+ {'port': port, 'pid': os.getpid()})
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
return port
from eventlet import event
from eventlet import greenthread
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging
from heat.openstack.common import timeutils
Network-related utilities and helper functions.
"""
-from heat.openstack.common import log as logging
-
-
-LOG = logging.getLogger(__name__)
+import urlparse
def parse_host_port(address, default_port=None):
port = default_port
return (host, None if port is None else int(port))
+
+
+def urlsplit(url, scheme='', allow_fragments=True):
+ """Parse a URL using urlparse.urlsplit(), splitting query and fragments.
+ This function papers over Python issue9374 when needed.
+
+ The parameters are the same as urlparse.urlsplit.
+ """
+ scheme, netloc, path, query, fragment = urlparse.urlsplit(
+ url, scheme, allow_fragments)
+ if allow_fragments and '#' in path:
+ path, fragment = path.split('#', 1)
+ if '?' in path:
+ path, query = path.split('?', 1)
+ return urlparse.SplitResult(scheme, netloc, path, query, fragment)
if _drivers is None:
_drivers = {}
for notification_driver in CONF.notification_driver:
- add_driver(notification_driver)
-
+ try:
+ driver = importutils.import_module(notification_driver)
+ _drivers[notification_driver] = driver
+ except ImportError:
+ LOG.exception(_("Failed to load notifier %s. "
+ "These notifications will not be sent.") %
+ notification_driver)
return _drivers.values()
-def add_driver(notification_driver):
- """Add a notification driver at runtime."""
- # Make sure the driver list is initialized.
- _get_drivers()
- if isinstance(notification_driver, basestring):
- # Load and add
- try:
- driver = importutils.import_module(notification_driver)
- _drivers[notification_driver] = driver
- except ImportError:
- LOG.exception(_("Failed to load notifier %s. "
- "These notifications will not be sent.") %
- notification_driver)
- else:
- # Driver is already loaded; just add the object.
- _drivers[notification_driver] = notification_driver
-
-
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global _drivers
from oslo.config import cfg
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils
from heat.openstack.common import local
from heat.openstack.common import log as logging
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
+from oslo.config import cfg
from heat.openstack.common import excutils
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import local
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import common as rpc_common
+amqp_opts = [
+ cfg.BoolOpt('amqp_durable_queues',
+ default=False,
+ deprecated_name='rabbit_durable_queues',
+ deprecated_group='DEFAULT',
+ help='Use durable queues in amqp.'),
+ cfg.BoolOpt('amqp_auto_delete',
+ default=False,
+ help='Auto-delete queues in amqp.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
+
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
+ ack_on_error=True):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
- exchange_name)
+ exchange_name,
+ ack_on_error)
def consume_in_thread(self):
self.connection.consume_in_thread()
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
- try:
- msg = {'result': reply, 'failure': failure}
- except TypeError:
- msg = {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure}
+ msg = {'result': reply, 'failure': failure}
if ending:
msg['ending'] = True
_add_unique_id(msg)
from oslo.config import cfg
import six
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import local
class RPCException(Exception):
- message = _("An unknown RPC related exception occurred.")
+ msg_fmt = _("An unknown RPC related exception occurred.")
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if not message:
try:
- message = self.message % kwargs
+ message = self.msg_fmt % kwargs
except Exception:
# kwargs doesn't match a variable in the message
for name, value in kwargs.iteritems():
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
- message = self.message
+ message = self.msg_fmt
super(RPCException, self).__init__(message)
contains all of the relevant info.
"""
- message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+ msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
- message = _('Timeout while waiting on RPC response - '
+ msg_fmt = _('Timeout while waiting on RPC response - '
'topic: "%(topic)s", RPC method: "%(method)s" '
'info: "%(info)s"')
class DuplicateMessageError(RPCException):
- message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+ msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
- message = _("Invalid reuse of an RPC connection.")
+ msg_fmt = _("Invalid reuse of an RPC connection.")
class UnsupportedRpcVersion(RPCException):
- message = _("Specified RPC version, %(version)s, not supported by "
+ msg_fmt = _("Specified RPC version, %(version)s, not supported by "
"this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
- message = _("Specified RPC envelope version, %(version)s, "
+ msg_fmt = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class RpcVersionCapError(RPCException):
- message = _("Specified RPC version cap, %(version_cap)s, is too low")
+ msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
class Connection(object):
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {'set_admin_password': [('args', 'new_pass')],
- 'run_instance': [('args', 'admin_password')],
- 'route_message': [('args', 'message', 'args', 'method_info',
- 'method_kwargs', 'password'),
- ('args', 'message', 'args', 'method_info',
- 'method_kwargs', 'admin_password')]}
-
- has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
- has_context_token = '_context_auth_token' in msg_data
- has_token = 'auth_token' in msg_data
-
- if not any([has_method, has_context_token, has_token]):
- return log_func(msg, msg_data)
-
- msg_data = copy.deepcopy(msg_data)
-
- if has_method:
- for arg in SANITIZE.get(msg_data['method'], []):
- try:
- d = msg_data
- for elem in arg[:-1]:
- d = d[elem]
- d[arg[-1]] = '<SANITIZED>'
- except KeyError as e:
- LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
- {'item': arg,
- 'err': e})
-
- if has_context_token:
- msg_data['_context_auth_token'] = '<SANITIZED>'
-
- if has_token:
- msg_data['auth_token'] = '<SANITIZED>'
-
- return log_func(msg, msg_data)
+ SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
+
+ def _fix_passwords(d):
+ """Sanitizes the password fields in the dictionary."""
+ for k in d.iterkeys():
+ if k.lower().find('password') != -1:
+ d[k] = '<SANITIZED>'
+ elif k.lower() in SANITIZE:
+ d[k] = '<SANITIZED>'
+ elif isinstance(d[k], dict):
+ _fix_passwords(d[k])
+ return d
+
+ return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
def serialize_remote_exception(failure_info, log_failure=True):
import itertools
import socket
import ssl
-import sys
import time
import uuid
import kombu.messaging
from oslo.config import cfg
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common import excutils
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import network_utils
from heat.openstack.common.rpc import amqp as rpc_amqp
from heat.openstack.common.rpc import common as rpc_common
+from heat.openstack.common import sslutils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
- help='SSL version to use (valid only if SSL enabled)'),
+ help='SSL version to use (valid only if SSL enabled). '
+ 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
+ 'be available on some distributions'
+ ),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
help='SSL key file (valid only if SSL enabled)'),
default=0,
help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'),
- cfg.BoolOpt('rabbit_durable_queues',
- default=False,
- help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
+ self.ack_on_error = kwargs.get('ack_on_error', True)
self.reconnect(channel)
def reconnect(self, channel):
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
+ def _callback_handler(self, message, callback):
+ """Call callback with deserialized message.
+
+ Messages that are processed without exception are ack'ed.
+
+ If the message processing generates an exception, it will be
+ ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
+ Rejection is better than waiting for the message to timeout.
+ Rejected messages are immediately requeued.
+ """
+
+ ack_msg = False
+ try:
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
+ ack_msg = True
+ except Exception:
+ if self.ack_on_error:
+ ack_msg = True
+ LOG.exception(_("Failed to process message"
+ " ... skipping it."))
+ else:
+ LOG.exception(_("Failed to process message"
+ " ... will requeue."))
+ finally:
+ if ack_msg:
+ message.ack()
+ else:
+ message.reject()
+
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
If kwargs['nowait'] is True, then this call will block until
a message is read.
- Messages will automatically be acked if the callback doesn't
- raise an exception
"""
options = {'consumer_tag': self.tag}
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
- try:
- msg = rpc_common.deserialize_msg(message.payload)
- callback(msg)
- except Exception:
- LOG.exception(_("Failed to process message... skipping it."))
- finally:
- message.ack()
+ self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
Other kombu options may be passed as keyword arguments
"""
# Default options
- options = {'durable': conf.rabbit_durable_queues,
+ options = {'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
- 'auto_delete': False,
+ 'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
Kombu options may be passed as keyword args to override defaults
"""
- options = {'durable': conf.rabbit_durable_queues,
- 'auto_delete': False,
+ options = {'durable': conf.amqp_durable_queues,
+ 'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf)
"""Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs):
- self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
+ self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version:
- ssl_params['ssl_version'] = self.conf.kombu_ssl_version
+ ssl_params['ssl_version'] = sslutils.validate_ssl_version(
+ self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile:
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
- if not ssl_params:
- # Just have the default behavior
- return True
- else:
- # Return the extended behavior
- return ssl_params
+ # Return the extended behavior or just have the default behavior
+ return ssl_params or True
def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
log_info.update(params)
if self.max_retries and attempt == self.max_retries:
- LOG.error(_('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info)
- # NOTE(comstud): Copied from original code. There's
- # really no better recourse because if this was a queue we
- # need to consume on, we have no way to consume anymore.
- sys.exit(1)
+ msg = _('Unable to connect to AMQP server on '
+ '%(hostname)s:%(port)d after %(max_retries)d '
+ 'tries: %(err_str)s') % log_info
+ LOG.error(msg)
+ raise rpc_common.RPCException(msg)
if attempt == 1:
sleep_time = self.interval_start or 1
def _consume():
if info['do_consume']:
- queues_head = self.consumers[:-1]
- queues_tail = self.consumers[-1]
+ queues_head = self.consumers[:-1] # not fanout.
+ queues_tail = self.consumers[-1] # fanout
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
+ ack_on_error=ack_on_error,
),
topic, callback)
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
+ ack_on_error=ack_on_error,
)
import greenlet
from oslo.config import cfg
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common import excutils
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
- self.reconnect(session)
+ self.connect(session)
+
+ def connect(self, session):
+ """Declare the reciever on connect."""
+ self._declare_receiver(session)
def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect."""
+ self._declare_receiver(session)
+
+ def _declare_receiver(self, session):
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
+ # TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
def get_receiver(self):
return self.receiver
+ def get_node_name(self):
+ return self.address.split(';')[0]
+
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'."""
'callback' is the callback to call when messages are received
"""
- super(DirectConsumer, self).__init__(session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {"exclusive": True})
+ super(DirectConsumer, self).__init__(
+ session, callback,
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {
+ "auto-delete": conf.amqp_auto_delete,
+ "exclusive": True,
+ "durable": conf.amqp_durable_queues,
+ })
class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
- super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (exchange_name, topic),
- {}, name or topic, {})
+ super(TopicConsumer, self).__init__(
+ session, callback,
+ "%s/%s" % (exchange_name, topic),
+ {}, name or topic,
+ {
+ "auto-delete": conf.amqp_auto_delete,
+ "durable": conf.amqp_durable_queues,
+ })
class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
+ self.conf = conf
super(FanoutConsumer, self).__init__(
session, callback,
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
+ def reconnect(self, session):
+ topic = self.get_node_name().rpartition('_fanout')[0]
+ params = {
+ 'session': session,
+ 'topic': topic,
+ 'callback': self.callback,
+ }
+
+ self.__init__(conf=self.conf, **params)
+
+ super(FanoutConsumer, self).reconnect(session)
+
class Publisher(object):
"""Base Publisher class."""
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread."""
+ @excutils.forever_retry_uncaught_exceptions
def _consumer_thread():
try:
self.consume()
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None):
+ exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
from oslo.config import cfg
from heat.openstack.common import excutils
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common.rpc import common as rpc_common
def __init__(self, conf):
super(ZmqBaseReactor, self).__init__()
- self.mapping = {}
self.proxies = {}
self.threads = []
self.sockets = []
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
- def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
- zmq_type_out=None, in_bind=True, out_bind=True,
- subscribe=None):
+ def register(self, proxy, in_addr, zmq_type_in,
+ in_bind=True, subscribe=None):
LOG.info(_("Registering reactor"))
LOG.info(_("In reactor registered"))
- if not out_addr:
- return
-
- if zmq_type_out not in (zmq.PUSH, zmq.PUB):
- raise RPCException("Bad output socktype")
-
- # Items push out.
- outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
-
- self.mapping[inq] = outq
- self.mapping[outq] = inq
- self.sockets.append(outq)
-
- LOG.info(_("Out reactor registered"))
-
def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
try:
self.register(consumption_proxy,
consume_in,
- zmq.PULL,
- out_bind=True)
+ zmq.PULL)
except zmq.ZMQError:
if os.access(ipc_dir, os.X_OK):
with excutils.save_and_reraise_exception():
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
- if sock in self.mapping:
- LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
- 'data': data})
- self.mapping[sock].send(data)
- return
proxy = self.proxies[sock]
import eventlet
from oslo.config import cfg
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging
that it maps directly to a host, thus direct.
"""
def test(self, key):
- if '.' in key:
- return True
- return False
+ return '.' in key
class TopicBinding(Binding):
matches that of a direct exchange.
"""
def test(self, key):
- if '.' not in key:
- return True
- return False
+ return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
- if key.startswith('fanout~'):
- return True
- return False
+ return key.startswith('fanout~')
class StubExchange(Exchange):
from oslo.config import cfg
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import matchmaker as mm
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
- if key in self.ring0:
- return True
- return False
+ return key in self.ring0
class RoundRobinRingExchange(RingExchange):
v = vers if vers else self.default_version
if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)):
- raise rpc_common.RpcVersionCapError(version=self.version_cap)
+ raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
msg['version'] = v
def _get_topic(self, topic):
# License for the specific language governing permissions and limitations
# under the License.
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import log as logging
from heat.openstack.common import rpc
from heat.openstack.common.rpc import dispatcher as rpc_dispatcher
A service enables rpc by listening to queues based on topic and host.
"""
- def __init__(self, host, topic, manager=None):
+ def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
+ self.serializer = serializer
if manager is None:
self.manager = self
else:
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
- dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
+ dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
+ self.serializer)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
import time
import eventlet
+from eventlet import event
import logging as std_logging
from oslo.config import cfg
from heat.openstack.common import eventlet_backdoor
-from heat.openstack.common.gettextutils import _
+from heat.openstack.common.gettextutils import _ # noqa
from heat.openstack.common import importutils
from heat.openstack.common import log as logging
from heat.openstack.common import threadgroup
:returns: None
"""
- self._services = threadgroup.ThreadGroup()
+ self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
- @staticmethod
- def run_service(service):
- """Start and wait for a service to finish.
-
- :param service: service to run and wait for.
- :returns: None
-
- """
- service.start()
- service.wait()
-
def launch_service(self, service):
"""Load and start the given service.
"""
service.backdoor_port = self.backdoor_port
- self._services.add_thread(self.run_service, service)
+ self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
- self._services.stop()
+ self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
- self._services.wait()
+ self.services.wait()
+
+ def restart(self):
+ """Reload config files and restart service.
+
+ :returns: None
+
+ """
+ cfg.CONF.reload_config_files()
+ self.services.restart()
class SignalExit(SystemExit):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signo)
- def wait(self):
+ def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
+ signal.signal(signal.SIGHUP, self._handle_signal)
+
+ def _wait_for_exit_or_signal(self):
+ status = None
+ signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
- status = None
try:
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
+ signal.SIGINT: 'SIGINT',
+ signal.SIGHUP: 'SIGHUP'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
+ signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
- if rpc:
- rpc.cleanup()
self.stop()
- return status
+ if rpc:
+ try:
+ rpc.cleanup()
+ except Exception:
+ # We're shutting down, so it doesn't matter at this point.
+ LOG.exception(_('Exception during rpc cleanup.'))
+
+ return status, signo
+
+ def wait(self):
+ while True:
+ self.handle_signal()
+ status, signo = self._wait_for_exit_or_signal()
+ if signo != signal.SIGHUP:
+ return status
+ self.restart()
class ServiceWrapper(object):
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+ self.handle_signal()
+ def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
+ signal.signal(signal.SIGHUP, self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
sys.exit(1)
- def _child_process(self, service):
+ def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
+ def _sighup(*args):
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ raise SignalExit(signal.SIGHUP)
+
signal.signal(signal.SIGTERM, _sigterm)
+ signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
+ def _child_wait_for_exit_or_signal(self, launcher):
+ status = None
+ signo = 0
+
+ try:
+ launcher.wait()
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT',
+ signal.SIGHUP: 'SIGHUP'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ signo = exc.signo
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+ finally:
+ launcher.stop()
+
+ return status, signo
+
+ def _child_process(self, service):
+ self._child_process_handle_signal()
+
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
random.seed()
launcher = Launcher()
- launcher.run_service(service)
+ launcher.launch_service(service)
+ return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
- status = 0
- try:
- self._child_process(wrap.service)
- except SignalExit as exc:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[exc.signo]
- LOG.info(_('Caught %s, exiting'), signame)
- status = exc.code
- except SystemExit as exc:
- status = exc.code
- except BaseException:
- LOG.exception(_('Unhandled exception'))
- status = 2
- finally:
- wrap.service.stop()
+ launcher = self._child_process(wrap.service)
+ while True:
+ self._child_process_handle_signal()
+ status, signo = self._child_wait_for_exit_or_signal(launcher)
+ if signo != signal.SIGHUP:
+ break
+ launcher.restart()
os._exit(status)
wrap.children.remove(pid)
return wrap
- def wait(self):
- """Loop waiting on children to die and respawning as necessary."""
-
- LOG.debug(_('Full set of CONF:'))
- CONF.log_opt_values(LOG, std_logging.DEBUG)
-
+ def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
-
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
- if self.sigcaught:
- signame = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}[self.sigcaught]
- LOG.info(_('Caught %s, stopping children'), signame)
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary."""
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ while True:
+ self.handle_signal()
+ self._respawn_children()
+ if self.sigcaught:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT',
+ signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
+ LOG.info(_('Caught %s, stopping children'), signame)
+ if self.sigcaught != signal.SIGHUP:
+ break
+
+ for pid in self.children:
+ os.kill(pid, signal.SIGHUP)
+ self.running = True
+ self.sigcaught = None
for pid in self.children:
try:
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
+ # signal that the service is done shutting itself down:
+ self._done = event.Event()
+
+ def reset(self):
+ # NOTE(Fengqian): docs for Event.reset() recommend against using it
+ self._done = event.Event()
+
def start(self):
pass
def stop(self):
self.tg.stop()
+ self.tg.wait()
+ # Signal that service cleanup is done:
+ if not self._done.ready():
+ self._done.send()
+
+ def wait(self):
+ self._done.wait()
+
+
+class Services(object):
+
+ def __init__(self):
+ self.services = []
+ self.tg = threadgroup.ThreadGroup()
+ self.done = event.Event()
+
+ def add(self, service):
+ self.services.append(service)
+ self.tg.add_thread(self.run_service, service, self.done)
+
+ def stop(self):
+ # wait for graceful shutdown of services:
+ for service in self.services:
+ service.stop()
+ service.wait()
+
+ # Each service has performed cleanup, now signal that the run_service
+ # wrapper threads can now die:
+ if not self.done.ready():
+ self.done.send()
+
+ # reap threads:
+ self.tg.stop()
def wait(self):
self.tg.wait()
+ def restart(self):
+ self.stop()
+ self.done = event.Event()
+ for restart_service in self.services:
+ restart_service.reset()
+ self.tg.add_thread(self.run_service, restart_service, self.done)
+
+ @staticmethod
+ def run_service(service, done):
+ """Service start wrapper.
+
+ :param service: service to run
+ :param done: event to wait on until a shutdown is triggered
+ :returns: None
+
+ """
+ service.start()
+ done.wait()
+
def launch(service, workers=None):
if workers:
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import ssl
+
+from oslo.config import cfg
+
+from heat.openstack.common.gettextutils import _ # noqa
+
+
+ssl_opts = [
+ cfg.StrOpt('ca_file',
+ default=None,
+ help="CA certificate file to use to verify "
+ "connecting clients"),
+ cfg.StrOpt('cert_file',
+ default=None,
+ help="Certificate file to use when starting "
+ "the server securely"),
+ cfg.StrOpt('key_file',
+ default=None,
+ help="Private key file to use when starting "
+ "the server securely"),
+]
+
+
+CONF = cfg.CONF
+CONF.register_opts(ssl_opts, "ssl")
+
+
+def is_enabled():
+ cert_file = CONF.ssl.cert_file
+ key_file = CONF.ssl.key_file
+ ca_file = CONF.ssl.ca_file
+ use_ssl = cert_file or key_file
+
+ if cert_file and not os.path.exists(cert_file):
+ raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
+
+ if ca_file and not os.path.exists(ca_file):
+ raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
+
+ if key_file and not os.path.exists(key_file):
+ raise RuntimeError(_("Unable to find key_file : %s") % key_file)
+
+ if use_ssl and (not cert_file or not key_file):
+ raise RuntimeError(_("When running server in SSL mode, you must "
+ "specify both a cert_file and key_file "
+ "option value in your configuration file"))
+
+ return use_ssl
+
+
+def wrap(sock):
+ ssl_kwargs = {
+ 'server_side': True,
+ 'certfile': CONF.ssl.cert_file,
+ 'keyfile': CONF.ssl.key_file,
+ 'cert_reqs': ssl.CERT_NONE,
+ }
+
+ if CONF.ssl.ca_file:
+ ssl_kwargs['ca_certs'] = CONF.ssl.ca_file
+ ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
+
+ return ssl.wrap_socket(sock, **ssl_kwargs)
+
+
+_SSL_PROTOCOLS = {
+ "tlsv1": ssl.PROTOCOL_TLSv1,
+ "sslv23": ssl.PROTOCOL_SSLv23,
+ "sslv3": ssl.PROTOCOL_SSLv3
+}
+
+try:
+ _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
+except AttributeError:
+ pass
+
+
+def validate_ssl_version(version):
+ key = version.lower()
+ try:
+ return _SSL_PROTOCOLS[key]
+ except KeyError:
+ raise RuntimeError(_("Invalid SSL version : %s") % version)
# License for the specific language governing permissions and limitations
# under the License.
-from eventlet import greenlet
+import eventlet
from eventlet import greenpool
from eventlet import greenthread
for x in self.timers:
try:
x.wait()
- except greenlet.GreenletExit:
+ except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
continue
try:
x.wait()
- except greenlet.GreenletExit:
+ except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
- raise ValueError(e.message)
+ raise ValueError(unicode(e))
except TypeError as e:
- raise ValueError(e.message)
+ raise ValueError(unicode(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
--- /dev/null
+#!/usr/bin/env bash
+
+print_hint() {
+ echo "Try \`${0##*/} --help' for more information." >&2
+}
+
+PARSED_OPTIONS=$(getopt -n "${0##*/}" -o hb:p:o: \
+ --long help,base-dir:,package-name:,output-dir: -- "$@")
+
+if [ $? != 0 ] ; then print_hint ; exit 1 ; fi
+
+eval set -- "$PARSED_OPTIONS"
+
+while true; do
+ case "$1" in
+ -h|--help)
+ echo "${0##*/} [options]"
+ echo ""
+ echo "options:"
+ echo "-h, --help show brief help"
+ echo "-b, --base-dir=DIR Project base directory (required)"
+ echo "-p, --package-name=NAME Project package name"
+ echo "-o, --output-dir=DIR File output directory"
+ exit 0
+ ;;
+ -b|--base-dir)
+ shift
+ BASEDIR=`echo $1 | sed -e 's/\/*$//g'`
+ shift
+ ;;
+ -p|--package-name)
+ shift
+ PACKAGENAME=`echo $1`
+ shift
+ ;;
+ -o|--output-dir)
+ shift
+ OUTPUTDIR=`echo $1 | sed -e 's/\/*$//g'`
+ shift
+ ;;
+ --)
+ break
+ ;;
+ esac
+done
+
+if [ -z $BASEDIR ] || ! [ -d $BASEDIR ]
+then
+ echo "${0##*/}: missing project base directory" >&2 ; print_hint ; exit 1
+fi
+
+PACKAGENAME=${PACKAGENAME:-${BASEDIR##*/}}
+
+OUTPUTDIR=${OUTPUTDIR:-$BASEDIR/etc}
+if ! [ -d $OUTPUTDIR ]
+then
+ echo "${0##*/}: cannot access \`$OUTPUTDIR': No such file or directory" >&2
+ exit 1
+fi
+
+BASEDIRESC=`echo $BASEDIR | sed -e 's/\//\\\\\//g'`
+FILES=$(find $BASEDIR/$PACKAGENAME -type f -name "*.py" ! -path "*/tests/*" \
+ -exec grep -l "Opt(" {} + | sed -e "s/^$BASEDIRESC\///g" | sort -u)
+
+export EVENTLET_NO_GREENDNS=yes
+
+MODULEPATH=heat.openstack.common.config.generator
+OUTPUTFILE=$OUTPUTDIR/$PACKAGENAME.conf.sample
+python -m $MODULEPATH $FILES > $OUTPUTFILE
import os
import sys
-import install_venv_common as install_venv
+import install_venv_common as install_venv # noqa
def first_file(file_list):