From a7d47c4ecea903f0706ac313abef888507ce0522 Mon Sep 17 00:00:00 2001 From: Michael Kerrin Date: Wed, 17 Apr 2013 16:41:50 +0000 Subject: [PATCH] Update to latest copy of OSLO incubator Specifically I wanted the change be84db3ab24ef94b6ec457bb299d48c51575e8a6 to oslo-incubator to fix my logging issue. I ignored the changes in common/policy.py as this change breaks the cinder unit tests. Fixes bug: 1170038 Change-Id: Id72417d58c8f4bf139aa082131154153a175689d --- cinder/openstack/common/eventlet_backdoor.py | 87 +++++ cinder/openstack/common/gettextutils.py | 23 +- cinder/openstack/common/jsonutils.py | 58 +++- cinder/openstack/common/log.py | 38 ++- cinder/openstack/common/loopingcall.py | 147 ++++++++ cinder/openstack/common/notifier/api.py | 1 - cinder/openstack/common/rootwrap/cmd.py | 128 +++++++ cinder/openstack/common/rootwrap/filters.py | 46 +++ cinder/openstack/common/rpc/amqp.py | 14 +- cinder/openstack/common/rpc/common.py | 31 +- cinder/openstack/common/rpc/dispatcher.py | 21 +- cinder/openstack/common/rpc/impl_fake.py | 12 +- cinder/openstack/common/rpc/impl_qpid.py | 6 +- cinder/openstack/common/rpc/impl_zmq.py | 20 +- cinder/openstack/common/rpc/proxy.py | 28 +- cinder/openstack/common/rpc/zmq_receiver.py | 41 +++ cinder/openstack/common/service.py | 332 +++++++++++++++++++ cinder/openstack/common/threadgroup.py | 114 +++++++ 18 files changed, 1077 insertions(+), 70 deletions(-) create mode 100644 cinder/openstack/common/eventlet_backdoor.py create mode 100644 cinder/openstack/common/loopingcall.py create mode 100755 cinder/openstack/common/rootwrap/cmd.py create mode 100755 cinder/openstack/common/rpc/zmq_receiver.py create mode 100644 cinder/openstack/common/service.py create mode 100644 cinder/openstack/common/threadgroup.py diff --git a/cinder/openstack/common/eventlet_backdoor.py b/cinder/openstack/common/eventlet_backdoor.py new file mode 100644 index 000000000..c0ad460fe --- /dev/null +++ b/cinder/openstack/common/eventlet_backdoor.py @@ -0,0 +1,87 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 OpenStack Foundation. +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import gc +import pprint +import sys +import traceback + +import eventlet +import eventlet.backdoor +import greenlet +from oslo.config import cfg + +eventlet_backdoor_opts = [ + cfg.IntOpt('backdoor_port', + default=None, + help='port for eventlet backdoor to listen') +] + +CONF = cfg.CONF +CONF.register_opts(eventlet_backdoor_opts) + + +def _dont_use_this(): + print "Don't use this, just disconnect instead" + + +def _find_objects(t): + return filter(lambda o: isinstance(o, t), gc.get_objects()) + + +def _print_greenthreads(): + for i, gt in enumerate(_find_objects(greenlet.greenlet)): + print i, gt + traceback.print_stack(gt.gr_frame) + print + + +def _print_nativethreads(): + for threadId, stack in sys._current_frames().items(): + print threadId + traceback.print_stack(stack) + print + + +def initialize_if_enabled(): + backdoor_locals = { + 'exit': _dont_use_this, # So we don't exit the entire process + 'quit': _dont_use_this, # So we don't exit the entire process + 'fo': _find_objects, + 'pgt': _print_greenthreads, + 'pnt': _print_nativethreads, + } + + if CONF.backdoor_port is None: + return None + + # NOTE(johannes): The standard sys.displayhook will print the value of + # the last expression and set it to __builtin__._, which overwrites + # the __builtin__._ that gettext sets. Let's switch to using pprint + # since it won't interact poorly with gettext, and it's easier to + # read the output too. + def displayhook(val): + if val is not None: + pprint.pprint(val) + sys.displayhook = displayhook + + sock = eventlet.listen(('localhost', CONF.backdoor_port)) + port = sock.getsockname()[1] + eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, + locals=backdoor_locals) + return port diff --git a/cinder/openstack/common/gettextutils.py b/cinder/openstack/common/gettextutils.py index 87e3520e6..5c0540b43 100644 --- a/cinder/openstack/common/gettextutils.py +++ b/cinder/openstack/common/gettextutils.py @@ -24,10 +24,27 @@ Usual usage in an openstack.common module: """ import gettext +import os - -t = gettext.translation('openstack-common', 'locale', fallback=True) +_localedir = os.environ.get('cinder'.upper() + '_LOCALEDIR') +_t = gettext.translation('cinder', localedir=_localedir, fallback=True) def _(msg): - return t.ugettext(msg) + return _t.ugettext(msg) + + +def install(domain): + """Install a _() function using the given translation domain. + + Given a translation domain, install a _() function using gettext's + install() function. + + The main difference from gettext.install() is that we allow + overriding the default localedir (e.g. /usr/share/locale) using + a translation-domain-specific environment variable (e.g. + NOVA_LOCALEDIR). + """ + gettext.install(domain, + localedir=os.environ.get(domain.upper() + '_LOCALEDIR'), + unicode=True) diff --git a/cinder/openstack/common/jsonutils.py b/cinder/openstack/common/jsonutils.py index 1e9262ed0..70134d419 100644 --- a/cinder/openstack/common/jsonutils.py +++ b/cinder/openstack/common/jsonutils.py @@ -38,11 +38,21 @@ import functools import inspect import itertools import json +import types import xmlrpclib from cinder.openstack.common import timeutils +_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, + inspect.isfunction, inspect.isgeneratorfunction, + inspect.isgenerator, inspect.istraceback, inspect.isframe, + inspect.iscode, inspect.isbuiltin, inspect.isroutine, + inspect.isabstract] + +_simple_types = (types.NoneType, int, basestring, bool, float, long) + + def to_primitive(value, convert_instances=False, convert_datetime=True, level=0, max_depth=3): """Convert a complex object into primitives. @@ -58,17 +68,30 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, Therefore, convert_instances=True is lossy ... be aware. """ - nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod, - inspect.isfunction, inspect.isgeneratorfunction, - inspect.isgenerator, inspect.istraceback, inspect.isframe, - inspect.iscode, inspect.isbuiltin, inspect.isroutine, - inspect.isabstract] - for test in nasty: - if test(value): - return unicode(value) - - # value of itertools.count doesn't get caught by inspects - # above and results in infinite loop when list(value) is called. + # handle obvious types first - order of basic types determined by running + # full tests on nova project, resulting in the following counts: + # 572754 + # 460353 + # 379632 + # 274610 + # 199918 + # 114200 + # 51817 + # 26164 + # 6491 + # 283 + # 19 + if isinstance(value, _simple_types): + return value + + if isinstance(value, datetime.datetime): + if convert_datetime: + return timeutils.strtime(value) + else: + return value + + # value of itertools.count doesn't get caught by nasty_type_tests + # and results in infinite loop when list(value) is called. if type(value) == itertools.count: return unicode(value) @@ -91,17 +114,18 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, convert_datetime=convert_datetime, level=level, max_depth=max_depth) + if isinstance(value, dict): + return dict((k, recursive(v)) for k, v in value.iteritems()) + elif isinstance(value, (list, tuple)): + return [recursive(lv) for lv in value] + # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled if isinstance(value, xmlrpclib.DateTime): value = datetime.datetime(*tuple(value.timetuple())[:6]) - if isinstance(value, (list, tuple)): - return [recursive(v) for v in value] - elif isinstance(value, dict): - return dict((k, recursive(v)) for k, v in value.iteritems()) - elif convert_datetime and isinstance(value, datetime.datetime): + if convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) elif hasattr(value, 'iteritems'): return recursive(dict(value.iteritems()), level=level + 1) @@ -112,6 +136,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, # Ignore class member vars. return recursive(value.__dict__, level=level + 1) else: + if any(test(value) for test in _nasty_type_tests): + return unicode(value) return value except TypeError: # Class objects are tricky since they may define something like diff --git a/cinder/openstack/common/log.py b/cinder/openstack/common/log.py index 9f321b946..c170971f9 100644 --- a/cinder/openstack/common/log.py +++ b/cinder/openstack/common/log.py @@ -29,6 +29,7 @@ It also allows setting of formatting information through conf. """ +import ConfigParser import cStringIO import inspect import itertools @@ -87,11 +88,11 @@ logging_cli_opts = [ metavar='PATH', deprecated_name='logfile', help='(Optional) Name of log file to output to. ' - 'If not set, logging will go to stdout.'), + 'If no default is set, logging will go to stdout.'), cfg.StrOpt('log-dir', deprecated_name='logdir', - help='(Optional) The directory to keep log files in ' - '(will be prepended to --log-file)'), + help='(Optional) The base directory used for relative ' + '--log-file paths'), cfg.BoolOpt('use-syslog', default=False, help='Use syslog for logging.'), @@ -111,9 +112,9 @@ generic_log_opts = [ log_opts = [ cfg.StrOpt('logging_context_format_string', - default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s ' - '[%(request_id)s %(user)s %(tenant)s] %(instance)s' - '%(message)s', + default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' + '%(name)s [%(request_id)s %(user)s %(tenant)s] ' + '%(instance)s%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' @@ -323,10 +324,30 @@ def _create_logging_excepthook(product_name): return logging_excepthook +class LogConfigError(Exception): + + message = _('Error loading logging config %(log_config)s: %(err_msg)s') + + def __init__(self, log_config, err_msg): + self.log_config = log_config + self.err_msg = err_msg + + def __str__(self): + return self.message % dict(log_config=self.log_config, + err_msg=self.err_msg) + + +def _load_log_config(log_config): + try: + logging.config.fileConfig(log_config) + except ConfigParser.Error, exc: + raise LogConfigError(log_config, str(exc)) + + def setup(product_name): """Setup logging.""" if CONF.log_config: - logging.config.fileConfig(CONF.log_config) + _load_log_config(CONF.log_config) else: _setup_logging_from_conf() sys.excepthook = _create_logging_excepthook(product_name) @@ -411,14 +432,11 @@ def _setup_logging_from_conf(): else: log_root.setLevel(logging.WARNING) - level = logging.NOTSET for pair in CONF.default_log_levels: mod, _sep, level_name = pair.partition('=') level = logging.getLevelName(level_name) logger = logging.getLogger(mod) logger.setLevel(level) - for handler in log_root.handlers: - logger.addHandler(handler) _loggers = {} diff --git a/cinder/openstack/common/loopingcall.py b/cinder/openstack/common/loopingcall.py new file mode 100644 index 000000000..8be3a00eb --- /dev/null +++ b/cinder/openstack/common/loopingcall.py @@ -0,0 +1,147 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys + +from eventlet import event +from eventlet import greenthread + +from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common import log as logging +from cinder.openstack.common import timeutils + +LOG = logging.getLogger(__name__) + + +class LoopingCallDone(Exception): + """Exception to break out and stop a LoopingCall. + + The poll-function passed to LoopingCall can raise this exception to + break out of the loop normally. This is somewhat analogous to + StopIteration. + + An optional return-value can be included as the argument to the exception; + this return-value will be returned by LoopingCall.wait() + + """ + + def __init__(self, retvalue=True): + """:param retvalue: Value that LoopingCall.wait() should return.""" + self.retvalue = retvalue + + +class LoopingCallBase(object): + def __init__(self, f=None, *args, **kw): + self.args = args + self.kw = kw + self.f = f + self._running = False + self.done = None + + def stop(self): + self._running = False + + def wait(self): + return self.done.wait() + + +class FixedIntervalLoopingCall(LoopingCallBase): + """A fixed interval looping call.""" + + def start(self, interval, initial_delay=None): + self._running = True + done = event.Event() + + def _inner(): + if initial_delay: + greenthread.sleep(initial_delay) + + try: + while self._running: + start = timeutils.utcnow() + self.f(*self.args, **self.kw) + end = timeutils.utcnow() + if not self._running: + break + delay = interval - timeutils.delta_seconds(start, end) + if delay <= 0: + LOG.warn(_('task run outlasted interval by %s sec') % + -delay) + greenthread.sleep(delay if delay > 0 else 0) + except LoopingCallDone, e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_('in fixed duration looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + + greenthread.spawn_n(_inner) + return self.done + + +# TODO(mikal): this class name is deprecated in Havana and should be removed +# in the I release +LoopingCall = FixedIntervalLoopingCall + + +class DynamicLoopingCall(LoopingCallBase): + """A looping call which sleeps until the next known event. + + The function called should return how long to sleep for before being + called again. + """ + + def start(self, initial_delay=None, periodic_interval_max=None): + self._running = True + done = event.Event() + + def _inner(): + if initial_delay: + greenthread.sleep(initial_delay) + + try: + while self._running: + idle = self.f(*self.args, **self.kw) + if not self._running: + break + + if periodic_interval_max is not None: + idle = min(idle, periodic_interval_max) + LOG.debug(_('Dynamic looping call sleeping for %.02f ' + 'seconds'), idle) + greenthread.sleep(idle) + except LoopingCallDone, e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_('in dynamic looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + + greenthread.spawn(_inner) + return self.done diff --git a/cinder/openstack/common/notifier/api.py b/cinder/openstack/common/notifier/api.py index 38b2daba7..6b82e4451 100644 --- a/cinder/openstack/common/notifier/api.py +++ b/cinder/openstack/common/notifier/api.py @@ -30,7 +30,6 @@ LOG = logging.getLogger(__name__) notifier_opts = [ cfg.MultiStrOpt('notification_driver', default=[], - deprecated_name='list_notifier_drivers', help='Driver or drivers to handle sending notifications'), cfg.StrOpt('default_notification_level', default='INFO', diff --git a/cinder/openstack/common/rootwrap/cmd.py b/cinder/openstack/common/rootwrap/cmd.py new file mode 100755 index 000000000..78265e30c --- /dev/null +++ b/cinder/openstack/common/rootwrap/cmd.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Root wrapper for OpenStack services + + Filters which commands a service is allowed to run as another user. + + To use this with cinder, you should set the following in + cinder.conf: + rootwrap_config=/etc/cinder/rootwrap.conf + + You also need to let the cinder user run cinder-rootwrap + as root in sudoers: + cinder ALL = (root) NOPASSWD: /usr/bin/cinder-rootwrap + /etc/cinder/rootwrap.conf * + + Service packaging should deploy .filters files only on nodes where + they are needed, to avoid allowing more than is necessary. +""" + +import ConfigParser +import logging +import os +import pwd +import signal +import subprocess +import sys + + +RC_UNAUTHORIZED = 99 +RC_NOCOMMAND = 98 +RC_BADCONFIG = 97 +RC_NOEXECFOUND = 96 + + +def _subprocess_setup(): + # Python installs a SIGPIPE handler by default. This is usually not what + # non-Python subprocesses expect. + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + + +def _exit_error(execname, message, errorcode, log=True): + print "%s: %s" % (execname, message) + if log: + logging.error(message) + sys.exit(errorcode) + + +def main(): + # Split arguments, require at least a command + execname = sys.argv.pop(0) + if len(sys.argv) < 2: + _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False) + + configfile = sys.argv.pop(0) + userargs = sys.argv[:] + + # Add ../ to sys.path to allow running from branch + possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname), + os.pardir, os.pardir)) + if os.path.exists(os.path.join(possible_topdir, "cinder", "__init__.py")): + sys.path.insert(0, possible_topdir) + + from cinder.openstack.common.rootwrap import wrapper + + # Load configuration + try: + rawconfig = ConfigParser.RawConfigParser() + rawconfig.read(configfile) + config = wrapper.RootwrapConfig(rawconfig) + except ValueError as exc: + msg = "Incorrect value in %s: %s" % (configfile, exc.message) + _exit_error(execname, msg, RC_BADCONFIG, log=False) + except ConfigParser.Error: + _exit_error(execname, "Incorrect configuration file: %s" % configfile, + RC_BADCONFIG, log=False) + + if config.use_syslog: + wrapper.setup_syslog(execname, + config.syslog_log_facility, + config.syslog_log_level) + + # Execute command if it matches any of the loaded filters + filters = wrapper.load_filters(config.filters_path) + try: + filtermatch = wrapper.match_filter(filters, userargs, + exec_dirs=config.exec_dirs) + if filtermatch: + command = filtermatch.get_command(userargs, + exec_dirs=config.exec_dirs) + if config.use_syslog: + logging.info("(%s > %s) Executing %s (filter match = %s)" % ( + os.getlogin(), pwd.getpwuid(os.getuid())[0], + command, filtermatch.name)) + + obj = subprocess.Popen(command, + stdin=sys.stdin, + stdout=sys.stdout, + stderr=sys.stderr, + preexec_fn=_subprocess_setup, + env=filtermatch.get_environment(userargs)) + obj.wait() + sys.exit(obj.returncode) + + except wrapper.FilterMatchNotExecutable as exc: + msg = ("Executable not found: %s (filter match = %s)" + % (exc.match.exec_path, exc.match.name)) + _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog) + + except wrapper.NoFilterMatched: + msg = ("Unauthorized command: %s (no filter matched)" + % ' '.join(userargs)) + _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog) diff --git a/cinder/openstack/common/rootwrap/filters.py b/cinder/openstack/common/rootwrap/filters.py index eadda256c..d9618af88 100644 --- a/cinder/openstack/common/rootwrap/filters.py +++ b/cinder/openstack/common/rootwrap/filters.py @@ -88,6 +88,52 @@ class RegExpFilter(CommandFilter): return False +class PathFilter(CommandFilter): + """Command filter checking that path arguments are within given dirs + + One can specify the following constraints for command arguments: + 1) pass - pass an argument as is to the resulting command + 2) some_str - check if an argument is equal to the given string + 3) abs path - check if a path argument is within the given base dir + + A typical rootwrapper filter entry looks like this: + # cmdname: filter name, raw command, user, arg_i_constraint [, ...] + chown: PathFilter, /bin/chown, root, nova, /var/lib/images + + """ + + def match(self, userargs): + command, arguments = userargs[0], userargs[1:] + + equal_args_num = len(self.args) == len(arguments) + exec_is_valid = super(PathFilter, self).match(userargs) + args_equal_or_pass = all( + arg == 'pass' or arg == value + for arg, value in zip(self.args, arguments) + if not os.path.isabs(arg) # arguments not specifying abs paths + ) + paths_are_within_base_dirs = all( + os.path.commonprefix([arg, os.path.realpath(value)]) == arg + for arg, value in zip(self.args, arguments) + if os.path.isabs(arg) # arguments specifying abs paths + ) + + return (equal_args_num and + exec_is_valid and + args_equal_or_pass and + paths_are_within_base_dirs) + + def get_command(self, userargs, exec_dirs=[]): + command, arguments = userargs[0], userargs[1:] + + # convert path values to canonical ones; copy other args as is + args = [os.path.realpath(value) if os.path.isabs(arg) else value + for arg, value in zip(self.args, arguments)] + + return super(PathFilter, self).get_command([command] + args, + exec_dirs) + + class DnsmasqFilter(CommandFilter): """Specific filter for the dnsmasq call (which includes env)""" diff --git a/cinder/openstack/common/rpc/amqp.py b/cinder/openstack/common/rpc/amqp.py index 832511cd7..9addfa1c7 100644 --- a/cinder/openstack/common/rpc/amqp.py +++ b/cinder/openstack/common/rpc/amqp.py @@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) - version = message_data.get('version', None) + version = message_data.get('version') + namespace = message_data.get('namespace') if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, version, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, + namespace, args) - def _process_data(self, ctxt, version, method, args): + def _process_data(self, ctxt, version, method, namespace, args): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait): """ ctxt.update_store() try: - rval = self.proxy.dispatch(ctxt, version, method, **args) + rval = self.proxy.dispatch(ctxt, version, method, namespace, + **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: @@ -495,7 +498,6 @@ class MulticallProxyWaiter(object): data = self._dataqueue.get(timeout=self._timeout) result = self._process_data(data) except queue.Empty: - LOG.exception(_('Timed out waiting for RPC response.')) self.done() raise rpc_common.Timeout() except Exception: @@ -662,7 +664,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: - msg = rpc_common.serialize_msg(msg, force_envelope=True) + msg = rpc_common.serialize_msg(msg) conn.notify_send(topic, msg) diff --git a/cinder/openstack/common/rpc/common.py b/cinder/openstack/common/rpc/common.py index b2d1e91ff..9f0552e5e 100644 --- a/cinder/openstack/common/rpc/common.py +++ b/cinder/openstack/common/rpc/common.py @@ -70,10 +70,6 @@ _VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'oslo.message' -# TODO(russellb) Turn this on after Grizzly. -_SEND_RPC_ENVELOPE = False - - class RPCException(Exception): message = _("An unknown RPC related exception occurred.") @@ -122,7 +118,25 @@ class Timeout(RPCException): This exception is raised if the rpc_response_timeout is reached while waiting for a response from the remote side. """ - message = _("Timeout while waiting on RPC response.") + message = _('Timeout while waiting on RPC response - ' + 'topic: "%(topic)s", RPC method: "%(method)s" ' + 'info: "%(info)s"') + + def __init__(self, info=None, topic=None, method=None): + """ + :param info: Extra info to convey to the user + :param topic: The topic that the rpc call was sent to + :param rpc_method_name: The name of the rpc method being + called + """ + self.info = info + self.topic = topic + self.method = method + super(Timeout, self).__init__( + None, + info=info or _(''), + topic=topic or _(''), + method=method or _('')) class DuplicateMessageError(RPCException): @@ -325,7 +339,7 @@ def deserialize_remote_exception(conf, data): if not issubclass(klass, Exception): raise TypeError("Can only deserialize Exceptions") - failure = klass(**failure.get('kwargs', {})) + failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) except (AttributeError, TypeError, ImportError): return RemoteError(name, failure.get('message'), trace) @@ -441,10 +455,7 @@ def version_is_compatible(imp_version, version): return True -def serialize_msg(raw_msg, force_envelope=False): - if not _SEND_RPC_ENVELOPE and not force_envelope: - return raw_msg - +def serialize_msg(raw_msg): # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more # information about this format. msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, diff --git a/cinder/openstack/common/rpc/dispatcher.py b/cinder/openstack/common/rpc/dispatcher.py index 7734a7fb7..85195d4a7 100644 --- a/cinder/openstack/common/rpc/dispatcher.py +++ b/cinder/openstack/common/rpc/dispatcher.py @@ -103,13 +103,16 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - def dispatch(self, ctxt, version, method, **kwargs): + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. :param ctxt: The request context :param version: The requested API version from the incoming message :param method: The method requested to be called by the incoming message. + :param namespace: The namespace for the requested method. If None, + the dispatcher will look for a method on a callback + object with no namespace set. :param kwargs: A dict of keyword arguments to be passed to the method. :returns: Whatever is returned by the underlying method that gets @@ -120,13 +123,25 @@ class RpcDispatcher(object): had_compatible = False for proxyobj in self.callbacks: - if hasattr(proxyobj, 'RPC_API_VERSION'): + # Check for namespace compatibility + try: + cb_namespace = proxyobj.RPC_API_NAMESPACE + except AttributeError: + cb_namespace = None + + if namespace != cb_namespace: + continue + + # Check for version compatibility + try: rpc_api_version = proxyobj.RPC_API_VERSION - else: + except AttributeError: rpc_api_version = '1.0' + is_compatible = rpc_common.version_is_compatible(rpc_api_version, version) had_compatible = had_compatible or is_compatible + if not hasattr(proxyobj, method): continue if is_compatible: diff --git a/cinder/openstack/common/rpc/impl_fake.py b/cinder/openstack/common/rpc/impl_fake.py index 0f2f53500..ec7200a7b 100644 --- a/cinder/openstack/common/rpc/impl_fake.py +++ b/cinder/openstack/common/rpc/impl_fake.py @@ -57,13 +57,14 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, version, method, args, timeout): + def call(self, context, version, method, namespace, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = self.proxy.dispatch(context, version, method, **args) + rval = self.proxy.dispatch(context, version, method, + namespace, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, version, method, args, timeout) + return consumer.call(context, version, method, namespace, args, + timeout) def call(conf, context, topic, msg, timeout=None): @@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, version, method, args, None) + consumer.call(context, version, method, namespace, args, None) except Exception: pass diff --git a/cinder/openstack/common/rpc/impl_qpid.py b/cinder/openstack/common/rpc/impl_qpid.py index 5f181cdd2..24235b1f1 100644 --- a/cinder/openstack/common/rpc/impl_qpid.py +++ b/cinder/openstack/common/rpc/impl_qpid.py @@ -40,8 +40,8 @@ qpid_opts = [ cfg.StrOpt('qpid_hostname', default='localhost', help='Qpid broker hostname'), - cfg.StrOpt('qpid_port', - default='5672', + cfg.IntOpt('qpid_port', + default=5672, help='Qpid broker port'), cfg.ListOpt('qpid_hosts', default=['$qpid_hostname:$qpid_port'], @@ -320,7 +320,7 @@ class Connection(object): # Reconnection is done by self.reconnect() self.connection.reconnect = False self.connection.heartbeat = self.conf.qpid_heartbeat - self.connection.protocol = self.conf.qpid_protocol + self.connection.transport = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay def _register_consumer(self, consumer): diff --git a/cinder/openstack/common/rpc/impl_zmq.py b/cinder/openstack/common/rpc/impl_zmq.py index 6e3d93bad..d3d3599e8 100644 --- a/cinder/openstack/common/rpc/impl_zmq.py +++ b/cinder/openstack/common/rpc/impl_zmq.py @@ -221,7 +221,7 @@ class ZmqClient(object): def cast(self, msg_id, topic, data, envelope=False): msg_id = msg_id or 0 - if not (envelope or rpc_common._SEND_RPC_ENVELOPE): + if not envelope: self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) return @@ -276,7 +276,8 @@ class InternalContext(object): try: result = proxy.dispatch( - ctx, data['version'], data['method'], **data['args']) + ctx, data['version'], data['method'], + data.get('namespace'), **data['args']) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns @@ -295,11 +296,16 @@ class InternalContext(object): def reply(self, ctx, proxy, msg_id=None, context=None, topic=None, msg=None): """Reply to a casted call.""" - # Our real method is curried into msg['args'] + # NOTE(ewindisch): context kwarg exists for Grizzly compat. + # this may be able to be removed earlier than + # 'I' if ConsumerBase.process were refactored. + if type(msg) is list: + payload = msg[-1] + else: + payload = msg - child_ctx = RpcContext.unmarshal(msg[0]) response = ConsumerBase.normalize_reply( - self._get_response(child_ctx, proxy, topic, msg[1]), + self._get_response(ctx, proxy, topic, payload), ctx.replies) LOG.debug(_("Sending reply")) @@ -346,7 +352,7 @@ class ConsumerBase(object): return proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], data.get('namespace'), **data['args']) class ZmqBaseReactor(ConsumerBase): @@ -685,8 +691,8 @@ def _call(addr, context, topic, msg, timeout=None, 'method': '-reply', 'args': { 'msg_id': msg_id, - 'context': mcontext, 'topic': reply_topic, + # TODO(ewindisch): safe to remove mcontext in I. 'msg': [mcontext, msg] } } diff --git a/cinder/openstack/common/rpc/proxy.py b/cinder/openstack/common/rpc/proxy.py index 23305d41d..4ddc5c936 100644 --- a/cinder/openstack/common/rpc/proxy.py +++ b/cinder/openstack/common/rpc/proxy.py @@ -58,9 +58,13 @@ class RpcProxy(object): """Return the topic to use for a message.""" return topic if topic else self.topic + @staticmethod + def make_namespaced_msg(method, namespace, **kwargs): + return {'method': method, 'namespace': namespace, 'args': kwargs} + @staticmethod def make_msg(method, **kwargs): - return {'method': method, 'args': kwargs} + return RpcProxy.make_namespaced_msg(method, None, **kwargs) def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. @@ -68,16 +72,21 @@ class RpcProxy(object): :param context: The request context :param msg: The message to send, including the method and args. :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. :param timeout: (Optional) A timeout to use when waiting for the response. If no timeout is specified, a default timeout will be used that is usually sufficient. - :param version: (Optional) Override the requested API version in this - message. :returns: The return value from the remote method. """ self._set_version(msg, version) - return rpc.call(context, self._get_topic(topic), msg, timeout) + real_topic = self._get_topic(topic) + try: + return rpc.call(context, real_topic, msg, timeout) + except rpc.common.Timeout as exc: + raise rpc.common.Timeout( + exc.info, real_topic, msg.get('method')) def multicall(self, context, msg, topic=None, version=None, timeout=None): """rpc.multicall() a remote method. @@ -85,17 +94,22 @@ class RpcProxy(object): :param context: The request context :param msg: The message to send, including the method and args. :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. :param timeout: (Optional) A timeout to use when waiting for the response. If no timeout is specified, a default timeout will be used that is usually sufficient. - :param version: (Optional) Override the requested API version in this - message. :returns: An iterator that lets you process each of the returned values from the remote method as they arrive. """ self._set_version(msg, version) - return rpc.multicall(context, self._get_topic(topic), msg, timeout) + real_topic = self._get_topic(topic) + try: + return rpc.multicall(context, real_topic, msg, timeout) + except rpc.common.Timeout as exc: + raise rpc.common.Timeout( + exc.info, real_topic, msg.get('method')) def cast(self, context, msg, topic=None, version=None): """rpc.cast() a remote method. diff --git a/cinder/openstack/common/rpc/zmq_receiver.py b/cinder/openstack/common/rpc/zmq_receiver.py new file mode 100755 index 000000000..e4c6ee30e --- /dev/null +++ b/cinder/openstack/common/rpc/zmq_receiver.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import contextlib +import sys + +from oslo.config import cfg + +from cinder.openstack.common import log as logging +from cinder.openstack.common import rpc +from cinder.openstack.common.rpc import impl_zmq + +CONF = cfg.CONF +CONF.register_opts(rpc.rpc_opts) +CONF.register_opts(impl_zmq.zmq_opts) + + +def main(): + CONF(sys.argv[1:], project='oslo') + logging.setup("oslo") + + with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: + reactor.consume_in_thread() + reactor.wait() diff --git a/cinder/openstack/common/service.py b/cinder/openstack/common/service.py new file mode 100644 index 000000000..8600a0b08 --- /dev/null +++ b/cinder/openstack/common/service.py @@ -0,0 +1,332 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Generic Node base class for all workers that run on hosts.""" + +import errno +import os +import random +import signal +import sys +import time + +import eventlet +import logging as std_logging +from oslo.config import cfg + +from cinder.openstack.common import eventlet_backdoor +from cinder.openstack.common.gettextutils import _ +from cinder.openstack.common import importutils +from cinder.openstack.common import log as logging +from cinder.openstack.common import threadgroup + + +rpc = importutils.try_import('cinder.openstack.common.rpc') +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class Launcher(object): + """Launch one or more services and wait for them to complete.""" + + def __init__(self): + """Initialize the service launcher. + + :returns: None + + """ + self._services = threadgroup.ThreadGroup() + eventlet_backdoor.initialize_if_enabled() + + @staticmethod + def run_service(service): + """Start and wait for a service to finish. + + :param service: service to run and wait for. + :returns: None + + """ + service.start() + service.wait() + + def launch_service(self, service): + """Load and start the given service. + + :param service: The service you would like to start. + :returns: None + + """ + self._services.add_thread(self.run_service, service) + + def stop(self): + """Stop all services which are currently running. + + :returns: None + + """ + self._services.stop() + + def wait(self): + """Waits until all services have been stopped, and then returns. + + :returns: None + + """ + self._services.wait() + + +class SignalExit(SystemExit): + def __init__(self, signo, exccode=1): + super(SignalExit, self).__init__(exccode) + self.signo = signo + + +class ServiceLauncher(Launcher): + def _handle_signal(self, signo, frame): + # Allow the process to be killed again and die from natural causes + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + + raise SignalExit(signo) + + def wait(self): + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + LOG.debug(_('Full set of CONF:')) + CONF.log_opt_values(LOG, std_logging.DEBUG) + + status = None + try: + super(ServiceLauncher, self).wait() + except SignalExit as exc: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'}[exc.signo] + LOG.info(_('Caught %s, exiting'), signame) + status = exc.code + except SystemExit as exc: + status = exc.code + finally: + if rpc: + rpc.cleanup() + self.stop() + return status + + +class ServiceWrapper(object): + def __init__(self, service, workers): + self.service = service + self.workers = workers + self.children = set() + self.forktimes = [] + + +class ProcessLauncher(object): + def __init__(self): + self.children = {} + self.sigcaught = None + self.running = True + rfd, self.writepipe = os.pipe() + self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signo, frame): + self.sigcaught = signo + self.running = False + + # Allow the process to be killed again and die from natural causes + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + + def _pipe_watcher(self): + # This will block until the write end is closed when the parent + # dies unexpectedly + self.readpipe.read() + + LOG.info(_('Parent process has died unexpectedly, exiting')) + + sys.exit(1) + + def _child_process(self, service): + # Setup child signal handlers differently + def _sigterm(*args): + signal.signal(signal.SIGTERM, signal.SIG_DFL) + raise SignalExit(signal.SIGTERM) + + signal.signal(signal.SIGTERM, _sigterm) + # Block SIGINT and let the parent send us a SIGTERM + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Reopen the eventlet hub to make sure we don't share an epoll + # fd with parent and/or siblings, which would be bad + eventlet.hubs.use_hub() + + # Close write to ensure only parent has it open + os.close(self.writepipe) + # Create greenthread to watch for parent to close pipe + eventlet.spawn_n(self._pipe_watcher) + + # Reseed random number generator + random.seed() + + launcher = Launcher() + launcher.run_service(service) + + def _start_child(self, wrap): + if len(wrap.forktimes) > wrap.workers: + # Limit ourselves to one process a second (over the period of + # number of workers * 1 second). This will allow workers to + # start up quickly but ensure we don't fork off children that + # die instantly too quickly. + if time.time() - wrap.forktimes[0] < wrap.workers: + LOG.info(_('Forking too fast, sleeping')) + time.sleep(1) + + wrap.forktimes.pop(0) + + wrap.forktimes.append(time.time()) + + pid = os.fork() + if pid == 0: + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + status = 0 + try: + self._child_process(wrap.service) + except SignalExit as exc: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'}[exc.signo] + LOG.info(_('Caught %s, exiting'), signame) + status = exc.code + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_('Unhandled exception')) + status = 2 + finally: + wrap.service.stop() + + os._exit(status) + + LOG.info(_('Started child %d'), pid) + + wrap.children.add(pid) + self.children[pid] = wrap + + return pid + + def launch_service(self, service, workers=1): + wrap = ServiceWrapper(service, workers) + + LOG.info(_('Starting %d workers'), wrap.workers) + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def _wait_child(self): + try: + # Don't block if no child processes have exited + pid, status = os.waitpid(0, os.WNOHANG) + if not pid: + return None + except OSError as exc: + if exc.errno not in (errno.EINTR, errno.ECHILD): + raise + return None + + if os.WIFSIGNALED(status): + sig = os.WTERMSIG(status) + LOG.info(_('Child %(pid)d killed by signal %(sig)d'), + dict(pid=pid, sig=sig)) + else: + code = os.WEXITSTATUS(status) + LOG.info(_('Child %(pid)s exited with status %(code)d'), + dict(pid=pid, code=code)) + + if pid not in self.children: + LOG.warning(_('pid %d not in child list'), pid) + return None + + wrap = self.children.pop(pid) + wrap.children.remove(pid) + return wrap + + def wait(self): + """Loop waiting on children to die and respawning as necessary""" + + LOG.debug(_('Full set of CONF:')) + CONF.log_opt_values(LOG, std_logging.DEBUG) + + while self.running: + wrap = self._wait_child() + if not wrap: + # Yield to other threads if no children have exited + # Sleep for a short time to avoid excessive CPU usage + # (see bug #1095346) + eventlet.greenthread.sleep(.01) + continue + + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + if self.sigcaught: + signame = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'}[self.sigcaught] + LOG.info(_('Caught %s, stopping children'), signame) + + for pid in self.children: + try: + os.kill(pid, signal.SIGTERM) + except OSError as exc: + if exc.errno != errno.ESRCH: + raise + + # Wait for children to die + if self.children: + LOG.info(_('Waiting on %d children to exit'), len(self.children)) + while self.children: + self._wait_child() + + +class Service(object): + """Service object for binaries running on hosts.""" + + def __init__(self, threads=1000): + self.tg = threadgroup.ThreadGroup(threads) + + def start(self): + pass + + def stop(self): + self.tg.stop() + + def wait(self): + self.tg.wait() + + +def launch(service, workers=None): + if workers: + launcher = ProcessLauncher() + launcher.launch_service(service, workers=workers) + else: + launcher = ServiceLauncher() + launcher.launch_service(service) + return launcher diff --git a/cinder/openstack/common/threadgroup.py b/cinder/openstack/common/threadgroup.py new file mode 100644 index 000000000..5d6ec006b --- /dev/null +++ b/cinder/openstack/common/threadgroup.py @@ -0,0 +1,114 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from eventlet import greenlet +from eventlet import greenpool +from eventlet import greenthread + +from cinder.openstack.common import log as logging +from cinder.openstack.common import loopingcall + + +LOG = logging.getLogger(__name__) + + +def _thread_done(gt, *args, **kwargs): + """ Callback function to be passed to GreenThread.link() when we spawn() + Calls the :class:`ThreadGroup` to notify if. + + """ + kwargs['group'].thread_done(kwargs['thread']) + + +class Thread(object): + """ Wrapper around a greenthread, that holds a reference to the + :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when + it has done so it can be removed from the threads list. + """ + def __init__(self, thread, group): + self.thread = thread + self.thread.link(_thread_done, group=group, thread=self) + + def stop(self): + self.thread.kill() + + def wait(self): + return self.thread.wait() + + +class ThreadGroup(object): + """ The point of the ThreadGroup classis to: + + * keep track of timers and greenthreads (making it easier to stop them + when need be). + * provide an easy API to add timers. + """ + def __init__(self, thread_pool_size=10): + self.pool = greenpool.GreenPool(thread_pool_size) + self.threads = [] + self.timers = [] + + def add_timer(self, interval, callback, initial_delay=None, + *args, **kwargs): + pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) + pulse.start(interval=interval, + initial_delay=initial_delay) + self.timers.append(pulse) + + def add_thread(self, callback, *args, **kwargs): + gt = self.pool.spawn(callback, *args, **kwargs) + th = Thread(gt, self) + self.threads.append(th) + + def thread_done(self, thread): + self.threads.remove(thread) + + def stop(self): + current = greenthread.getcurrent() + for x in self.threads: + if x is current: + # don't kill the current thread. + continue + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + + for x in self.timers: + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + self.timers = [] + + def wait(self): + for x in self.timers: + try: + x.wait() + except greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) + current = greenthread.getcurrent() + for x in self.threads: + if x is current: + continue + try: + x.wait() + except greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) -- 2.45.2