--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 OpenStack Foundation.
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gc
+import pprint
+import sys
+import traceback
+
+import eventlet
+import eventlet.backdoor
+import greenlet
+from oslo.config import cfg
+
+eventlet_backdoor_opts = [
+ cfg.IntOpt('backdoor_port',
+ default=None,
+ help='port for eventlet backdoor to listen')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(eventlet_backdoor_opts)
+
+
+def _dont_use_this():
+ print "Don't use this, just disconnect instead"
+
+
+def _find_objects(t):
+ return filter(lambda o: isinstance(o, t), gc.get_objects())
+
+
+def _print_greenthreads():
+ for i, gt in enumerate(_find_objects(greenlet.greenlet)):
+ print i, gt
+ traceback.print_stack(gt.gr_frame)
+ print
+
+
+def _print_nativethreads():
+ for threadId, stack in sys._current_frames().items():
+ print threadId
+ traceback.print_stack(stack)
+ print
+
+
+def initialize_if_enabled():
+ backdoor_locals = {
+ 'exit': _dont_use_this, # So we don't exit the entire process
+ 'quit': _dont_use_this, # So we don't exit the entire process
+ 'fo': _find_objects,
+ 'pgt': _print_greenthreads,
+ 'pnt': _print_nativethreads,
+ }
+
+ if CONF.backdoor_port is None:
+ return None
+
+ # NOTE(johannes): The standard sys.displayhook will print the value of
+ # the last expression and set it to __builtin__._, which overwrites
+ # the __builtin__._ that gettext sets. Let's switch to using pprint
+ # since it won't interact poorly with gettext, and it's easier to
+ # read the output too.
+ def displayhook(val):
+ if val is not None:
+ pprint.pprint(val)
+ sys.displayhook = displayhook
+
+ sock = eventlet.listen(('localhost', CONF.backdoor_port))
+ port = sock.getsockname()[1]
+ eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
+ locals=backdoor_locals)
+ return port
"""
import gettext
+import os
-
-t = gettext.translation('openstack-common', 'locale', fallback=True)
+_localedir = os.environ.get('cinder'.upper() + '_LOCALEDIR')
+_t = gettext.translation('cinder', localedir=_localedir, fallback=True)
def _(msg):
- return t.ugettext(msg)
+ return _t.ugettext(msg)
+
+
+def install(domain):
+ """Install a _() function using the given translation domain.
+
+ Given a translation domain, install a _() function using gettext's
+ install() function.
+
+ The main difference from gettext.install() is that we allow
+ overriding the default localedir (e.g. /usr/share/locale) using
+ a translation-domain-specific environment variable (e.g.
+ NOVA_LOCALEDIR).
+ """
+ gettext.install(domain,
+ localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
+ unicode=True)
import inspect
import itertools
import json
+import types
import xmlrpclib
from cinder.openstack.common import timeutils
+_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
+ inspect.isfunction, inspect.isgeneratorfunction,
+ inspect.isgenerator, inspect.istraceback, inspect.isframe,
+ inspect.iscode, inspect.isbuiltin, inspect.isroutine,
+ inspect.isabstract]
+
+_simple_types = (types.NoneType, int, basestring, bool, float, long)
+
+
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=0, max_depth=3):
"""Convert a complex object into primitives.
Therefore, convert_instances=True is lossy ... be aware.
"""
- nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
- inspect.isfunction, inspect.isgeneratorfunction,
- inspect.isgenerator, inspect.istraceback, inspect.isframe,
- inspect.iscode, inspect.isbuiltin, inspect.isroutine,
- inspect.isabstract]
- for test in nasty:
- if test(value):
- return unicode(value)
-
- # value of itertools.count doesn't get caught by inspects
- # above and results in infinite loop when list(value) is called.
+ # handle obvious types first - order of basic types determined by running
+ # full tests on nova project, resulting in the following counts:
+ # 572754 <type 'NoneType'>
+ # 460353 <type 'int'>
+ # 379632 <type 'unicode'>
+ # 274610 <type 'str'>
+ # 199918 <type 'dict'>
+ # 114200 <type 'datetime.datetime'>
+ # 51817 <type 'bool'>
+ # 26164 <type 'list'>
+ # 6491 <type 'float'>
+ # 283 <type 'tuple'>
+ # 19 <type 'long'>
+ if isinstance(value, _simple_types):
+ return value
+
+ if isinstance(value, datetime.datetime):
+ if convert_datetime:
+ return timeutils.strtime(value)
+ else:
+ return value
+
+ # value of itertools.count doesn't get caught by nasty_type_tests
+ # and results in infinite loop when list(value) is called.
if type(value) == itertools.count:
return unicode(value)
convert_datetime=convert_datetime,
level=level,
max_depth=max_depth)
+ if isinstance(value, dict):
+ return dict((k, recursive(v)) for k, v in value.iteritems())
+ elif isinstance(value, (list, tuple)):
+ return [recursive(lv) for lv in value]
+
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
- if isinstance(value, (list, tuple)):
- return [recursive(v) for v in value]
- elif isinstance(value, dict):
- return dict((k, recursive(v)) for k, v in value.iteritems())
- elif convert_datetime and isinstance(value, datetime.datetime):
+ if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif hasattr(value, 'iteritems'):
return recursive(dict(value.iteritems()), level=level + 1)
# Ignore class member vars.
return recursive(value.__dict__, level=level + 1)
else:
+ if any(test(value) for test in _nasty_type_tests):
+ return unicode(value)
return value
except TypeError:
# Class objects are tricky since they may define something like
"""
+import ConfigParser
import cStringIO
import inspect
import itertools
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
- 'If not set, logging will go to stdout.'),
+ 'If no default is set, logging will go to stdout.'),
cfg.StrOpt('log-dir',
deprecated_name='logdir',
- help='(Optional) The directory to keep log files in '
- '(will be prepended to --log-file)'),
+ help='(Optional) The base directory used for relative '
+ '--log-file paths'),
cfg.BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
log_opts = [
cfg.StrOpt('logging_context_format_string',
- default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
- '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
- '%(message)s',
+ default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+ '%(name)s [%(request_id)s %(user)s %(tenant)s] '
+ '%(instance)s%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
return logging_excepthook
+class LogConfigError(Exception):
+
+ message = _('Error loading logging config %(log_config)s: %(err_msg)s')
+
+ def __init__(self, log_config, err_msg):
+ self.log_config = log_config
+ self.err_msg = err_msg
+
+ def __str__(self):
+ return self.message % dict(log_config=self.log_config,
+ err_msg=self.err_msg)
+
+
+def _load_log_config(log_config):
+ try:
+ logging.config.fileConfig(log_config)
+ except ConfigParser.Error, exc:
+ raise LogConfigError(log_config, str(exc))
+
+
def setup(product_name):
"""Setup logging."""
if CONF.log_config:
- logging.config.fileConfig(CONF.log_config)
+ _load_log_config(CONF.log_config)
else:
_setup_logging_from_conf()
sys.excepthook = _create_logging_excepthook(product_name)
else:
log_root.setLevel(logging.WARNING)
- level = logging.NOTSET
for pair in CONF.default_log_levels:
mod, _sep, level_name = pair.partition('=')
level = logging.getLevelName(level_name)
logger = logging.getLogger(mod)
logger.setLevel(level)
- for handler in log_root.handlers:
- logger.addHandler(handler)
_loggers = {}
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+from eventlet import event
+from eventlet import greenthread
+
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import timeutils
+
+LOG = logging.getLogger(__name__)
+
+
+class LoopingCallDone(Exception):
+ """Exception to break out and stop a LoopingCall.
+
+ The poll-function passed to LoopingCall can raise this exception to
+ break out of the loop normally. This is somewhat analogous to
+ StopIteration.
+
+ An optional return-value can be included as the argument to the exception;
+ this return-value will be returned by LoopingCall.wait()
+
+ """
+
+ def __init__(self, retvalue=True):
+ """:param retvalue: Value that LoopingCall.wait() should return."""
+ self.retvalue = retvalue
+
+
+class LoopingCallBase(object):
+ def __init__(self, f=None, *args, **kw):
+ self.args = args
+ self.kw = kw
+ self.f = f
+ self._running = False
+ self.done = None
+
+ def stop(self):
+ self._running = False
+
+ def wait(self):
+ return self.done.wait()
+
+
+class FixedIntervalLoopingCall(LoopingCallBase):
+ """A fixed interval looping call."""
+
+ def start(self, interval, initial_delay=None):
+ self._running = True
+ done = event.Event()
+
+ def _inner():
+ if initial_delay:
+ greenthread.sleep(initial_delay)
+
+ try:
+ while self._running:
+ start = timeutils.utcnow()
+ self.f(*self.args, **self.kw)
+ end = timeutils.utcnow()
+ if not self._running:
+ break
+ delay = interval - timeutils.delta_seconds(start, end)
+ if delay <= 0:
+ LOG.warn(_('task run outlasted interval by %s sec') %
+ -delay)
+ greenthread.sleep(delay if delay > 0 else 0)
+ except LoopingCallDone, e:
+ self.stop()
+ done.send(e.retvalue)
+ except Exception:
+ LOG.exception(_('in fixed duration looping call'))
+ done.send_exception(*sys.exc_info())
+ return
+ else:
+ done.send(True)
+
+ self.done = done
+
+ greenthread.spawn_n(_inner)
+ return self.done
+
+
+# TODO(mikal): this class name is deprecated in Havana and should be removed
+# in the I release
+LoopingCall = FixedIntervalLoopingCall
+
+
+class DynamicLoopingCall(LoopingCallBase):
+ """A looping call which sleeps until the next known event.
+
+ The function called should return how long to sleep for before being
+ called again.
+ """
+
+ def start(self, initial_delay=None, periodic_interval_max=None):
+ self._running = True
+ done = event.Event()
+
+ def _inner():
+ if initial_delay:
+ greenthread.sleep(initial_delay)
+
+ try:
+ while self._running:
+ idle = self.f(*self.args, **self.kw)
+ if not self._running:
+ break
+
+ if periodic_interval_max is not None:
+ idle = min(idle, periodic_interval_max)
+ LOG.debug(_('Dynamic looping call sleeping for %.02f '
+ 'seconds'), idle)
+ greenthread.sleep(idle)
+ except LoopingCallDone, e:
+ self.stop()
+ done.send(e.retvalue)
+ except Exception:
+ LOG.exception(_('in dynamic looping call'))
+ done.send_exception(*sys.exc_info())
+ return
+ else:
+ done.send(True)
+
+ self.done = done
+
+ greenthread.spawn(_inner)
+ return self.done
notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
- deprecated_name='list_notifier_drivers',
help='Driver or drivers to handle sending notifications'),
cfg.StrOpt('default_notification_level',
default='INFO',
--- /dev/null
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Root wrapper for OpenStack services
+
+ Filters which commands a service is allowed to run as another user.
+
+ To use this with cinder, you should set the following in
+ cinder.conf:
+ rootwrap_config=/etc/cinder/rootwrap.conf
+
+ You also need to let the cinder user run cinder-rootwrap
+ as root in sudoers:
+ cinder ALL = (root) NOPASSWD: /usr/bin/cinder-rootwrap
+ /etc/cinder/rootwrap.conf *
+
+ Service packaging should deploy .filters files only on nodes where
+ they are needed, to avoid allowing more than is necessary.
+"""
+
+import ConfigParser
+import logging
+import os
+import pwd
+import signal
+import subprocess
+import sys
+
+
+RC_UNAUTHORIZED = 99
+RC_NOCOMMAND = 98
+RC_BADCONFIG = 97
+RC_NOEXECFOUND = 96
+
+
+def _subprocess_setup():
+ # Python installs a SIGPIPE handler by default. This is usually not what
+ # non-Python subprocesses expect.
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+
+def _exit_error(execname, message, errorcode, log=True):
+ print "%s: %s" % (execname, message)
+ if log:
+ logging.error(message)
+ sys.exit(errorcode)
+
+
+def main():
+ # Split arguments, require at least a command
+ execname = sys.argv.pop(0)
+ if len(sys.argv) < 2:
+ _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
+
+ configfile = sys.argv.pop(0)
+ userargs = sys.argv[:]
+
+ # Add ../ to sys.path to allow running from branch
+ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
+ os.pardir, os.pardir))
+ if os.path.exists(os.path.join(possible_topdir, "cinder", "__init__.py")):
+ sys.path.insert(0, possible_topdir)
+
+ from cinder.openstack.common.rootwrap import wrapper
+
+ # Load configuration
+ try:
+ rawconfig = ConfigParser.RawConfigParser()
+ rawconfig.read(configfile)
+ config = wrapper.RootwrapConfig(rawconfig)
+ except ValueError as exc:
+ msg = "Incorrect value in %s: %s" % (configfile, exc.message)
+ _exit_error(execname, msg, RC_BADCONFIG, log=False)
+ except ConfigParser.Error:
+ _exit_error(execname, "Incorrect configuration file: %s" % configfile,
+ RC_BADCONFIG, log=False)
+
+ if config.use_syslog:
+ wrapper.setup_syslog(execname,
+ config.syslog_log_facility,
+ config.syslog_log_level)
+
+ # Execute command if it matches any of the loaded filters
+ filters = wrapper.load_filters(config.filters_path)
+ try:
+ filtermatch = wrapper.match_filter(filters, userargs,
+ exec_dirs=config.exec_dirs)
+ if filtermatch:
+ command = filtermatch.get_command(userargs,
+ exec_dirs=config.exec_dirs)
+ if config.use_syslog:
+ logging.info("(%s > %s) Executing %s (filter match = %s)" % (
+ os.getlogin(), pwd.getpwuid(os.getuid())[0],
+ command, filtermatch.name))
+
+ obj = subprocess.Popen(command,
+ stdin=sys.stdin,
+ stdout=sys.stdout,
+ stderr=sys.stderr,
+ preexec_fn=_subprocess_setup,
+ env=filtermatch.get_environment(userargs))
+ obj.wait()
+ sys.exit(obj.returncode)
+
+ except wrapper.FilterMatchNotExecutable as exc:
+ msg = ("Executable not found: %s (filter match = %s)"
+ % (exc.match.exec_path, exc.match.name))
+ _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
+
+ except wrapper.NoFilterMatched:
+ msg = ("Unauthorized command: %s (no filter matched)"
+ % ' '.join(userargs))
+ _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
return False
+class PathFilter(CommandFilter):
+ """Command filter checking that path arguments are within given dirs
+
+ One can specify the following constraints for command arguments:
+ 1) pass - pass an argument as is to the resulting command
+ 2) some_str - check if an argument is equal to the given string
+ 3) abs path - check if a path argument is within the given base dir
+
+ A typical rootwrapper filter entry looks like this:
+ # cmdname: filter name, raw command, user, arg_i_constraint [, ...]
+ chown: PathFilter, /bin/chown, root, nova, /var/lib/images
+
+ """
+
+ def match(self, userargs):
+ command, arguments = userargs[0], userargs[1:]
+
+ equal_args_num = len(self.args) == len(arguments)
+ exec_is_valid = super(PathFilter, self).match(userargs)
+ args_equal_or_pass = all(
+ arg == 'pass' or arg == value
+ for arg, value in zip(self.args, arguments)
+ if not os.path.isabs(arg) # arguments not specifying abs paths
+ )
+ paths_are_within_base_dirs = all(
+ os.path.commonprefix([arg, os.path.realpath(value)]) == arg
+ for arg, value in zip(self.args, arguments)
+ if os.path.isabs(arg) # arguments specifying abs paths
+ )
+
+ return (equal_args_num and
+ exec_is_valid and
+ args_equal_or_pass and
+ paths_are_within_base_dirs)
+
+ def get_command(self, userargs, exec_dirs=[]):
+ command, arguments = userargs[0], userargs[1:]
+
+ # convert path values to canonical ones; copy other args as is
+ args = [os.path.realpath(value) if os.path.isabs(arg) else value
+ for arg, value in zip(self.args, arguments)]
+
+ return super(PathFilter, self).get_command([command] + args,
+ exec_dirs)
+
+
class DnsmasqFilter(CommandFilter):
"""Specific filter for the dnsmasq call (which includes env)"""
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
- version = message_data.get('version', None)
+ version = message_data.get('version')
+ namespace = message_data.get('namespace')
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, method,
+ namespace, args)
- def _process_data(self, ctxt, version, method, args):
+ def _process_data(self, ctxt, version, method, namespace, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
"""
ctxt.update_store()
try:
- rval = self.proxy.dispatch(ctxt, version, method, **args)
+ rval = self.proxy.dispatch(ctxt, version, method, namespace,
+ **args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
data = self._dataqueue.get(timeout=self._timeout)
result = self._process_data(data)
except queue.Empty:
- LOG.exception(_('Timed out waiting for RPC response.'))
self.done()
raise rpc_common.Timeout()
except Exception:
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
- msg = rpc_common.serialize_msg(msg, force_envelope=True)
+ msg = rpc_common.serialize_msg(msg)
conn.notify_send(topic, msg)
_MESSAGE_KEY = 'oslo.message'
-# TODO(russellb) Turn this on after Grizzly.
-_SEND_RPC_ENVELOPE = False
-
-
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
- message = _("Timeout while waiting on RPC response.")
+ message = _('Timeout while waiting on RPC response - '
+ 'topic: "%(topic)s", RPC method: "%(method)s" '
+ 'info: "%(info)s"')
+
+ def __init__(self, info=None, topic=None, method=None):
+ """
+ :param info: Extra info to convey to the user
+ :param topic: The topic that the rpc call was sent to
+ :param rpc_method_name: The name of the rpc method being
+ called
+ """
+ self.info = info
+ self.topic = topic
+ self.method = method
+ super(Timeout, self).__init__(
+ None,
+ info=info or _('<unknown>'),
+ topic=topic or _('<unknown>'),
+ method=method or _('<unknown>'))
class DuplicateMessageError(RPCException):
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
- failure = klass(**failure.get('kwargs', {}))
+ failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return RemoteError(name, failure.get('message'), trace)
return True
-def serialize_msg(raw_msg, force_envelope=False):
- if not _SEND_RPC_ENVELOPE and not force_envelope:
- return raw_msg
-
+def serialize_msg(raw_msg):
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- def dispatch(self, ctxt, version, method, **kwargs):
+ def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
:param ctxt: The request context
:param version: The requested API version from the incoming message
:param method: The method requested to be called by the incoming
message.
+ :param namespace: The namespace for the requested method. If None,
+ the dispatcher will look for a method on a callback
+ object with no namespace set.
:param kwargs: A dict of keyword arguments to be passed to the method.
:returns: Whatever is returned by the underlying method that gets
had_compatible = False
for proxyobj in self.callbacks:
- if hasattr(proxyobj, 'RPC_API_VERSION'):
+ # Check for namespace compatibility
+ try:
+ cb_namespace = proxyobj.RPC_API_NAMESPACE
+ except AttributeError:
+ cb_namespace = None
+
+ if namespace != cb_namespace:
+ continue
+
+ # Check for version compatibility
+ try:
rpc_api_version = proxyobj.RPC_API_VERSION
- else:
+ except AttributeError:
rpc_api_version = '1.0'
+
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible
+
if not hasattr(proxyobj, method):
continue
if is_compatible:
self.topic = topic
self.proxy = proxy
- def call(self, context, version, method, args, timeout):
+ def call(self, context, version, method, namespace, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
- rval = self.proxy.dispatch(context, version, method, **args)
+ rval = self.proxy.dispatch(context, version, method,
+ namespace, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, version, method, args, timeout)
+ return consumer.call(context, version, method, namespace, args,
+ timeout)
def call(conf, context, topic, msg, timeout=None):
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
for consumer in CONSUMERS.get(topic, []):
try:
- consumer.call(context, version, method, args, None)
+ consumer.call(context, version, method, namespace, args, None)
except Exception:
pass
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
- cfg.StrOpt('qpid_port',
- default='5672',
+ cfg.IntOpt('qpid_port',
+ default=5672,
help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
- self.connection.protocol = self.conf.qpid_protocol
+ self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
def _register_consumer(self, consumer):
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
- if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ if not envelope:
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return
try:
result = proxy.dispatch(
- ctx, data['version'], data['method'], **data['args'])
+ ctx, data['version'], data['method'],
+ data.get('namespace'), **data['args'])
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
def reply(self, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
- # Our real method is curried into msg['args']
+ # NOTE(ewindisch): context kwarg exists for Grizzly compat.
+ # this may be able to be removed earlier than
+ # 'I' if ConsumerBase.process were refactored.
+ if type(msg) is list:
+ payload = msg[-1]
+ else:
+ payload = msg
- child_ctx = RpcContext.unmarshal(msg[0])
response = ConsumerBase.normalize_reply(
- self._get_response(child_ctx, proxy, topic, msg[1]),
+ self._get_response(ctx, proxy, topic, payload),
ctx.replies)
LOG.debug(_("Sending reply"))
return
proxy.dispatch(ctx, data['version'],
- data['method'], **data['args'])
+ data['method'], data.get('namespace'), **data['args'])
class ZmqBaseReactor(ConsumerBase):
'method': '-reply',
'args': {
'msg_id': msg_id,
- 'context': mcontext,
'topic': reply_topic,
+ # TODO(ewindisch): safe to remove mcontext in I.
'msg': [mcontext, msg]
}
}
"""Return the topic to use for a message."""
return topic if topic else self.topic
+ @staticmethod
+ def make_namespaced_msg(method, namespace, **kwargs):
+ return {'method': method, 'namespace': namespace, 'args': kwargs}
+
@staticmethod
def make_msg(method, **kwargs):
- return {'method': method, 'args': kwargs}
+ return RpcProxy.make_namespaced_msg(method, None, **kwargs)
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
- return rpc.call(context, self._get_topic(topic), msg, timeout)
+ real_topic = self._get_topic(topic)
+ try:
+ return rpc.call(context, real_topic, msg, timeout)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def multicall(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.multicall() a remote method.
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: An iterator that lets you process each of the returned values
from the remote method as they arrive.
"""
self._set_version(msg, version)
- return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+ real_topic = self._get_topic(topic)
+ try:
+ return rpc.multicall(context, real_topic, msg, timeout)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def cast(self, context, msg, topic=None, version=None):
"""rpc.cast() a remote method.
--- /dev/null
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+eventlet.monkey_patch()
+
+import contextlib
+import sys
+
+from oslo.config import cfg
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import rpc
+from cinder.openstack.common.rpc import impl_zmq
+
+CONF = cfg.CONF
+CONF.register_opts(rpc.rpc_opts)
+CONF.register_opts(impl_zmq.zmq_opts)
+
+
+def main():
+ CONF(sys.argv[1:], project='oslo')
+ logging.setup("oslo")
+
+ with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
+ reactor.consume_in_thread()
+ reactor.wait()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Generic Node base class for all workers that run on hosts."""
+
+import errno
+import os
+import random
+import signal
+import sys
+import time
+
+import eventlet
+import logging as std_logging
+from oslo.config import cfg
+
+from cinder.openstack.common import eventlet_backdoor
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import importutils
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import threadgroup
+
+
+rpc = importutils.try_import('cinder.openstack.common.rpc')
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class Launcher(object):
+ """Launch one or more services and wait for them to complete."""
+
+ def __init__(self):
+ """Initialize the service launcher.
+
+ :returns: None
+
+ """
+ self._services = threadgroup.ThreadGroup()
+ eventlet_backdoor.initialize_if_enabled()
+
+ @staticmethod
+ def run_service(service):
+ """Start and wait for a service to finish.
+
+ :param service: service to run and wait for.
+ :returns: None
+
+ """
+ service.start()
+ service.wait()
+
+ def launch_service(self, service):
+ """Load and start the given service.
+
+ :param service: The service you would like to start.
+ :returns: None
+
+ """
+ self._services.add_thread(self.run_service, service)
+
+ def stop(self):
+ """Stop all services which are currently running.
+
+ :returns: None
+
+ """
+ self._services.stop()
+
+ def wait(self):
+ """Waits until all services have been stopped, and then returns.
+
+ :returns: None
+
+ """
+ self._services.wait()
+
+
+class SignalExit(SystemExit):
+ def __init__(self, signo, exccode=1):
+ super(SignalExit, self).__init__(exccode)
+ self.signo = signo
+
+
+class ServiceLauncher(Launcher):
+ def _handle_signal(self, signo, frame):
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ raise SignalExit(signo)
+
+ def wait(self):
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ status = None
+ try:
+ super(ServiceLauncher, self).wait()
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ finally:
+ if rpc:
+ rpc.cleanup()
+ self.stop()
+ return status
+
+
+class ServiceWrapper(object):
+ def __init__(self, service, workers):
+ self.service = service
+ self.workers = workers
+ self.children = set()
+ self.forktimes = []
+
+
+class ProcessLauncher(object):
+ def __init__(self):
+ self.children = {}
+ self.sigcaught = None
+ self.running = True
+ rfd, self.writepipe = os.pipe()
+ self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signo, frame):
+ self.sigcaught = signo
+ self.running = False
+
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ def _pipe_watcher(self):
+ # This will block until the write end is closed when the parent
+ # dies unexpectedly
+ self.readpipe.read()
+
+ LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+ sys.exit(1)
+
+ def _child_process(self, service):
+ # Setup child signal handlers differently
+ def _sigterm(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ raise SignalExit(signal.SIGTERM)
+
+ signal.signal(signal.SIGTERM, _sigterm)
+ # Block SIGINT and let the parent send us a SIGTERM
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ # Reopen the eventlet hub to make sure we don't share an epoll
+ # fd with parent and/or siblings, which would be bad
+ eventlet.hubs.use_hub()
+
+ # Close write to ensure only parent has it open
+ os.close(self.writepipe)
+ # Create greenthread to watch for parent to close pipe
+ eventlet.spawn_n(self._pipe_watcher)
+
+ # Reseed random number generator
+ random.seed()
+
+ launcher = Launcher()
+ launcher.run_service(service)
+
+ def _start_child(self, wrap):
+ if len(wrap.forktimes) > wrap.workers:
+ # Limit ourselves to one process a second (over the period of
+ # number of workers * 1 second). This will allow workers to
+ # start up quickly but ensure we don't fork off children that
+ # die instantly too quickly.
+ if time.time() - wrap.forktimes[0] < wrap.workers:
+ LOG.info(_('Forking too fast, sleeping'))
+ time.sleep(1)
+
+ wrap.forktimes.pop(0)
+
+ wrap.forktimes.append(time.time())
+
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ status = 0
+ try:
+ self._child_process(wrap.service)
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+ finally:
+ wrap.service.stop()
+
+ os._exit(status)
+
+ LOG.info(_('Started child %d'), pid)
+
+ wrap.children.add(pid)
+ self.children[pid] = wrap
+
+ return pid
+
+ def launch_service(self, service, workers=1):
+ wrap = ServiceWrapper(service, workers)
+
+ LOG.info(_('Starting %d workers'), wrap.workers)
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ def _wait_child(self):
+ try:
+ # Don't block if no child processes have exited
+ pid, status = os.waitpid(0, os.WNOHANG)
+ if not pid:
+ return None
+ except OSError as exc:
+ if exc.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ return None
+
+ if os.WIFSIGNALED(status):
+ sig = os.WTERMSIG(status)
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
+ dict(pid=pid, sig=sig))
+ else:
+ code = os.WEXITSTATUS(status)
+ LOG.info(_('Child %(pid)s exited with status %(code)d'),
+ dict(pid=pid, code=code))
+
+ if pid not in self.children:
+ LOG.warning(_('pid %d not in child list'), pid)
+ return None
+
+ wrap = self.children.pop(pid)
+ wrap.children.remove(pid)
+ return wrap
+
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary"""
+
+ LOG.debug(_('Full set of CONF:'))
+ CONF.log_opt_values(LOG, std_logging.DEBUG)
+
+ while self.running:
+ wrap = self._wait_child()
+ if not wrap:
+ # Yield to other threads if no children have exited
+ # Sleep for a short time to avoid excessive CPU usage
+ # (see bug #1095346)
+ eventlet.greenthread.sleep(.01)
+ continue
+
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ if self.sigcaught:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[self.sigcaught]
+ LOG.info(_('Caught %s, stopping children'), signame)
+
+ for pid in self.children:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError as exc:
+ if exc.errno != errno.ESRCH:
+ raise
+
+ # Wait for children to die
+ if self.children:
+ LOG.info(_('Waiting on %d children to exit'), len(self.children))
+ while self.children:
+ self._wait_child()
+
+
+class Service(object):
+ """Service object for binaries running on hosts."""
+
+ def __init__(self, threads=1000):
+ self.tg = threadgroup.ThreadGroup(threads)
+
+ def start(self):
+ pass
+
+ def stop(self):
+ self.tg.stop()
+
+ def wait(self):
+ self.tg.wait()
+
+
+def launch(service, workers=None):
+ if workers:
+ launcher = ProcessLauncher()
+ launcher.launch_service(service, workers=workers)
+ else:
+ launcher = ServiceLauncher()
+ launcher.launch_service(service)
+ return launcher
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from eventlet import greenlet
+from eventlet import greenpool
+from eventlet import greenthread
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import loopingcall
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _thread_done(gt, *args, **kwargs):
+ """ Callback function to be passed to GreenThread.link() when we spawn()
+ Calls the :class:`ThreadGroup` to notify if.
+
+ """
+ kwargs['group'].thread_done(kwargs['thread'])
+
+
+class Thread(object):
+ """ Wrapper around a greenthread, that holds a reference to the
+ :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
+ it has done so it can be removed from the threads list.
+ """
+ def __init__(self, thread, group):
+ self.thread = thread
+ self.thread.link(_thread_done, group=group, thread=self)
+
+ def stop(self):
+ self.thread.kill()
+
+ def wait(self):
+ return self.thread.wait()
+
+
+class ThreadGroup(object):
+ """ The point of the ThreadGroup classis to:
+
+ * keep track of timers and greenthreads (making it easier to stop them
+ when need be).
+ * provide an easy API to add timers.
+ """
+ def __init__(self, thread_pool_size=10):
+ self.pool = greenpool.GreenPool(thread_pool_size)
+ self.threads = []
+ self.timers = []
+
+ def add_timer(self, interval, callback, initial_delay=None,
+ *args, **kwargs):
+ pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
+ pulse.start(interval=interval,
+ initial_delay=initial_delay)
+ self.timers.append(pulse)
+
+ def add_thread(self, callback, *args, **kwargs):
+ gt = self.pool.spawn(callback, *args, **kwargs)
+ th = Thread(gt, self)
+ self.threads.append(th)
+
+ def thread_done(self, thread):
+ self.threads.remove(thread)
+
+ def stop(self):
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ # don't kill the current thread.
+ continue
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception as ex:
+ LOG.exception(ex)
+ self.timers = []
+
+ def wait(self):
+ for x in self.timers:
+ try:
+ x.wait()
+ except greenlet.GreenletExit:
+ pass
+ except Exception as ex:
+ LOG.exception(ex)
+ current = greenthread.getcurrent()
+ for x in self.threads:
+ if x is current:
+ continue
+ try:
+ x.wait()
+ except greenlet.GreenletExit:
+ pass
+ except Exception as ex:
+ LOG.exception(ex)