try:
# Engine does not currently support query by namespace/metric
# so we pass None/None and do any filtering locally
+ null_kwargs = {'metric_namespace': None,
+ 'metric_name': None}
watch_data = self.engine_rpcapi.show_watch_metric(con,
- namespace=None,
- metric_name=None)
+ **null_kwargs)
except rpc_common.RemoteError as ex:
return exception.map_remote_error(ex)
from heat.openstack.common import local
from heat.common import exception
from heat.common import wsgi
+from heat.openstack.common import context
from heat.openstack.common import importutils
from heat.openstack.common import uuidutils
from heat.db import api as db_api
return 'req-' + uuidutils.generate_uuid()
-class RequestContext(object):
+class RequestContext(context.RequestContext):
"""
Stores information about the security context under which the user
accesses the system, as well as additional request information.
:param kwargs: Extra arguments that might be present, but we ignore
because they possibly came in from older rpc messages.
"""
+ super(RequestContext, self).__init__(auth_token=auth_token,
+ user=username, tenant=tenant,
+ is_admin=is_admin,
+ read_only=read_only,
+ show_deleted=show_deleted,
+ request_id='unused')
- self.auth_token = auth_token
self.username = username
self.password = password
self.aws_creds = aws_creds
self.aws_auth_uri = aws_auth_uri
- self.tenant = tenant
self.tenant_id = tenant_id
self.auth_url = auth_url
self.roles = roles or []
- self.is_admin = is_admin
- self.read_only = read_only
- self._show_deleted = show_deleted
self.owner_is_tenant = owner_is_tenant
if overwrite or not hasattr(local.store, 'context'):
self.update_store()
def to_dict(self):
return {'auth_token': self.auth_token,
- 'username': self.username,
+ 'username': self.user,
'password': self.password,
'aws_creds': self.aws_creds,
'aws_auth_uri': self.aws_auth_uri,
"""Return the owner to correlate with an image."""
return self.tenant if self.owner_is_tenant else self.user
- @property
- def show_deleted(self):
- """Admins can see deleted by default."""
- if self._show_deleted or self.is_admin:
- return True
- return False
-
def get_admin_context(read_deleted="no"):
return RequestContext(is_admin=True)
return result
@request_context
- def show_watch_metric(self, cnxt, namespace=None, metric_name=None):
+ def show_watch_metric(self, cnxt, metric_namespace=None, metric_name=None):
'''
The show_watch method returns the datapoints for a metric
arg1 -> RPC context.
# DB API and schema does not yet allow us to easily query by
# namespace/metric, but we will want this at some point
# for now, the API can query all metric data and filter locally
- if namespace is not None or metric_name is not None:
+ if metric_namespace is not None or metric_name is not None:
logger.error("Filtering by namespace/metric not yet supported")
return
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Simple class that stores security context information in the web request.
+
+Projects should subclass this class if they wish to enhance the request
+context or provide additional information in their specific WSGI pipeline.
+"""
+
+import itertools
+
+from heat.openstack.common import uuidutils
+
+
+def generate_request_id():
+ return 'req-%s' % uuidutils.generate_uuid()
+
+
+class RequestContext(object):
+
+ """
+ Stores information about the security context under which the user
+ accesses the system, as well as additional request information.
+ """
+
+ def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
+ read_only=False, show_deleted=False, request_id=None):
+ self.auth_token = auth_token
+ self.user = user
+ self.tenant = tenant
+ self.is_admin = is_admin
+ self.read_only = read_only
+ self.show_deleted = show_deleted
+ if not request_id:
+ request_id = generate_request_id()
+ self.request_id = request_id
+
+ def to_dict(self):
+ return {'user': self.user,
+ 'tenant': self.tenant,
+ 'is_admin': self.is_admin,
+ 'read_only': self.read_only,
+ 'show_deleted': self.show_deleted,
+ 'auth_token': self.auth_token,
+ 'request_id': self.request_id}
+
+
+def get_admin_context(show_deleted="no"):
+ context = RequestContext(None,
+ tenant=None,
+ is_admin=True,
+ show_deleted=show_deleted)
+ return context
+
+
+def get_context_from_function_and_args(function, args, kwargs):
+ """Find an arg of type RequestContext and return it.
+
+ This is useful in a couple of decorators where we don't
+ know much about the function we're wrapping.
+ """
+
+ for arg in itertools.chain(kwargs.values(), args):
+ if isinstance(arg, RequestContext):
+ return arg
+
+ return None
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright (c) 2012 Openstack, LLC.
+# Copyright (c) 2012 OpenStack Foundation.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# License for the specific language governing permissions and limitations
# under the License.
+from __future__ import print_function
+
import gc
import pprint
import sys
def _dont_use_this():
- print "Don't use this, just disconnect instead"
+ print("Don't use this, just disconnect instead")
def _find_objects(t):
def _print_greenthreads():
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
- print i, gt
+ print(i, gt)
traceback.print_stack(gt.gr_frame)
- print
+ print()
+
+
+def _print_nativethreads():
+ for threadId, stack in sys._current_frames().items():
+ print(threadId)
+ traceback.print_stack(stack)
+ print()
def initialize_if_enabled():
'quit': _dont_use_this, # So we don't exit the entire process
'fo': _find_objects,
'pgt': _print_greenthreads,
+ 'pnt': _print_nativethreads,
}
if CONF.backdoor_port is None:
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
def _wrap(*args, **kw):
try:
return f(*args, **kw)
- except Exception, e:
+ except Exception as e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception(_('Uncaught exception'))
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
"""
import gettext
+import os
-
-t = gettext.translation('openstack-common', 'locale', fallback=True)
+_localedir = os.environ.get('heat'.upper() + '_LOCALEDIR')
+_t = gettext.translation('heat', localedir=_localedir, fallback=True)
def _(msg):
- return t.ugettext(msg)
+ return _t.ugettext(msg)
+
+
+def install(domain):
+ """Install a _() function using the given translation domain.
+
+ Given a translation domain, install a _() function using gettext's
+ install() function.
+
+ The main difference from gettext.install() is that we allow
+ overriding the default localedir (e.g. /usr/share/locale) using
+ a translation-domain-specific environment variable (e.g.
+ NOVA_LOCALEDIR).
+ """
+ gettext.install(domain,
+ localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
+ unicode=True)
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import datetime
+import functools
import inspect
import itertools
import json
+import types
import xmlrpclib
+import six
+
from heat.openstack.common import timeutils
-def to_primitive(value, convert_instances=False, level=0):
+_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
+ inspect.isfunction, inspect.isgeneratorfunction,
+ inspect.isgenerator, inspect.istraceback, inspect.isframe,
+ inspect.iscode, inspect.isbuiltin, inspect.isroutine,
+ inspect.isabstract]
+
+_simple_types = (types.NoneType, int, basestring, bool, float, long)
+
+
+def to_primitive(value, convert_instances=False, convert_datetime=True,
+ level=0, max_depth=3):
"""Convert a complex object into primitives.
Handy for JSON serialization. We can optionally handle instances,
Therefore, convert_instances=True is lossy ... be aware.
"""
- nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
- inspect.isfunction, inspect.isgeneratorfunction,
- inspect.isgenerator, inspect.istraceback, inspect.isframe,
- inspect.iscode, inspect.isbuiltin, inspect.isroutine,
- inspect.isabstract]
- for test in nasty:
- if test(value):
- return unicode(value)
-
- # value of itertools.count doesn't get caught by inspects
- # above and results in infinite loop when list(value) is called.
+ # handle obvious types first - order of basic types determined by running
+ # full tests on nova project, resulting in the following counts:
+ # 572754 <type 'NoneType'>
+ # 460353 <type 'int'>
+ # 379632 <type 'unicode'>
+ # 274610 <type 'str'>
+ # 199918 <type 'dict'>
+ # 114200 <type 'datetime.datetime'>
+ # 51817 <type 'bool'>
+ # 26164 <type 'list'>
+ # 6491 <type 'float'>
+ # 283 <type 'tuple'>
+ # 19 <type 'long'>
+ if isinstance(value, _simple_types):
+ return value
+
+ if isinstance(value, datetime.datetime):
+ if convert_datetime:
+ return timeutils.strtime(value)
+ else:
+ return value
+
+ # value of itertools.count doesn't get caught by nasty_type_tests
+ # and results in infinite loop when list(value) is called.
if type(value) == itertools.count:
- return unicode(value)
+ return six.text_type(value)
# FIXME(vish): Workaround for LP bug 852095. Without this workaround,
# tests that raise an exception in a mocked method that
if getattr(value, '__module__', None) == 'mox':
return 'mock'
- if level > 3:
+ if level > max_depth:
return '?'
# The try block may not be necessary after the class check above,
# but just in case ...
try:
+ recursive = functools.partial(to_primitive,
+ convert_instances=convert_instances,
+ convert_datetime=convert_datetime,
+ level=level,
+ max_depth=max_depth)
+ if isinstance(value, dict):
+ return dict((k, recursive(v)) for k, v in value.iteritems())
+ elif isinstance(value, (list, tuple)):
+ return [recursive(lv) for lv in value]
+
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
- if isinstance(value, (list, tuple)):
- o = []
- for v in value:
- o.append(to_primitive(v, convert_instances=convert_instances,
- level=level))
- return o
- elif isinstance(value, dict):
- o = {}
- for k, v in value.iteritems():
- o[k] = to_primitive(v, convert_instances=convert_instances,
- level=level)
- return o
- elif isinstance(value, datetime.datetime):
+ if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif hasattr(value, 'iteritems'):
- return to_primitive(dict(value.iteritems()),
- convert_instances=convert_instances,
- level=level + 1)
+ return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
- return to_primitive(list(value),
- convert_instances=convert_instances,
- level=level)
+ return recursive(list(value))
elif convert_instances and hasattr(value, '__dict__'):
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
- return to_primitive(value.__dict__,
- convert_instances=convert_instances,
- level=level + 1)
+ return recursive(value.__dict__, level=level + 1)
else:
+ if any(test(value) for test in _nasty_type_tests):
+ return six.text_type(value)
return value
except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
- return unicode(value)
+ return six.text_type(value)
def dumps(value, default=to_primitive, **kwargs):
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
if rval:
+ # NOTE(mikal): this bit is confusing. What is stored is a weak
+ # reference, not the value itself. We therefore need to lookup
+ # the weak reference and return the inner value here.
rval = rval()
return rval
return corolocal.local.__setattr__(self, attr, value)
+# NOTE(mikal): the name "store" should be deprecated in the future
store = WeakLocal()
+
+# A "weak" store uses weak references and allows an object to fall out of scope
+# when it falls out of scope in the code that uses the thread local storage. A
+# "strong" store will hold a reference to the object so that it never falls out
+# of scope.
+weak_store = WeakLocal()
+strong_store = corolocal.local
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
"""
+import ConfigParser
import cStringIO
import inspect
import itertools
import logging.config
import logging.handlers
import os
-import stat
import sys
import traceback
from oslo.config import cfg
from heat.openstack.common.gettextutils import _
+from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import local
-from heat.openstack.common import notifier
-_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format',
- default=_DEFAULT_LOG_FORMAT,
+ default=None,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
- 'Default: %(default)s'),
+ 'This option is deprecated. Please use '
+ 'logging_context_format_string and '
+ 'logging_default_format_string instead.'),
cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
- 'If not set, logging will go to stdout.'),
+ 'If no default is set, logging will go to stdout.'),
cfg.StrOpt('log-dir',
deprecated_name='logdir',
- help='(Optional) The directory to keep log files in '
- '(will be prepended to --log-file)'),
+ help='(Optional) The base directory used for relative '
+ '--log-file paths'),
cfg.BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
generic_log_opts = [
cfg.BoolOpt('use_stderr',
default=True,
- help='Log output to standard error'),
- cfg.StrOpt('logfile_mode',
- default='0644',
- help='Default file mode used when creating log files'),
+ help='Log output to standard error')
]
log_opts = [
cfg.StrOpt('logging_context_format_string',
- default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
- '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
- '%(message)s',
+ default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+ '%(name)s [%(request_id)s %(user)s %(tenant)s] '
+ '%(instance)s%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
return '%s.log' % (os.path.join(logdir, binary),)
-class ContextAdapter(logging.LoggerAdapter):
+class BaseLoggerAdapter(logging.LoggerAdapter):
+
+ def audit(self, msg, *args, **kwargs):
+ self.log(logging.AUDIT, msg, *args, **kwargs)
+
+
+class LazyAdapter(BaseLoggerAdapter):
+ def __init__(self, name='unknown', version='unknown'):
+ self._logger = None
+ self.extra = {}
+ self.name = name
+ self.version = version
+
+ @property
+ def logger(self):
+ if not self._logger:
+ self._logger = getLogger(self.name, self.version)
+ return self._logger
+
+
+class ContextAdapter(BaseLoggerAdapter):
warn = logging.LoggerAdapter.warning
def __init__(self, logger, project_name, version_string):
self.project = project_name
self.version = version_string
- def audit(self, msg, *args, **kwargs):
- self.log(logging.AUDIT, msg, *args, **kwargs)
+ @property
+ def handlers(self):
+ return self.logger.handlers
def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated: %s") % msg
return jsonutils.dumps(message)
-class PublishErrorsHandler(logging.Handler):
- def emit(self, record):
- if ('heat.openstack.common.notifier.log_notifier' in
- CONF.notification_driver):
- return
- notifier.api.notify(None, 'error.publisher',
- 'error_notification',
- notifier.api.ERROR,
- dict(error=record.msg))
-
-
def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb):
extra = {}
return logging_excepthook
+class LogConfigError(Exception):
+
+ message = _('Error loading logging config %(log_config)s: %(err_msg)s')
+
+ def __init__(self, log_config, err_msg):
+ self.log_config = log_config
+ self.err_msg = err_msg
+
+ def __str__(self):
+ return self.message % dict(log_config=self.log_config,
+ err_msg=self.err_msg)
+
+
+def _load_log_config(log_config):
+ try:
+ logging.config.fileConfig(log_config)
+ except ConfigParser.Error as exc:
+ raise LogConfigError(log_config, str(exc))
+
+
def setup(product_name):
"""Setup logging."""
- sys.excepthook = _create_logging_excepthook(product_name)
-
if CONF.log_config:
- try:
- logging.config.fileConfig(CONF.log_config)
- except Exception:
- traceback.print_exc()
- raise
+ _load_log_config(CONF.log_config)
else:
- _setup_logging_from_conf(product_name)
+ _setup_logging_from_conf()
+ sys.excepthook = _create_logging_excepthook(product_name)
def set_defaults(logging_context_format_string):
return facility
-def _setup_logging_from_conf(product_name):
- log_root = getLogger(product_name).logger
+def _setup_logging_from_conf():
+ log_root = getLogger(None).logger
for handler in log_root.handlers:
log_root.removeHandler(handler)
filelog = logging.handlers.WatchedFileHandler(logpath)
log_root.addHandler(filelog)
- mode = int(CONF.logfile_mode, 8)
- st = os.stat(logpath)
- if st.st_mode != (stat.S_IFREG | mode):
- os.chmod(logpath, mode)
-
if CONF.use_stderr:
streamlog = ColorHandler()
log_root.addHandler(streamlog)
log_root.addHandler(streamlog)
if CONF.publish_errors:
- log_root.addHandler(PublishErrorsHandler(logging.ERROR))
+ handler = importutils.import_object(
+ "heat.openstack.common.log_handler.PublishErrorsHandler",
+ logging.ERROR)
+ log_root.addHandler(handler)
+ datefmt = CONF.log_date_format
for handler in log_root.handlers:
- datefmt = CONF.log_date_format
+ # NOTE(alaski): CONF.log_format overrides everything currently. This
+ # should be deprecated in favor of context aware formatting.
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
- handler.setFormatter(LegacyFormatter(datefmt=datefmt))
+ log_root.info('Deprecated: log_format is now deprecated and will '
+ 'be removed in the next release')
+ else:
+ handler.setFormatter(ContextFormatter(datefmt=datefmt))
if CONF.debug:
log_root.setLevel(logging.DEBUG)
else:
log_root.setLevel(logging.WARNING)
- level = logging.NOTSET
for pair in CONF.default_log_levels:
mod, _sep, level_name = pair.partition('=')
level = logging.getLevelName(level_name)
logger = logging.getLogger(mod)
logger.setLevel(level)
- for handler in log_root.handlers:
- logger.addHandler(handler)
_loggers = {}
return _loggers[name]
+def getLazyLogger(name='unknown', version='unknown'):
+ """
+ create a pass-through logger that does not create the real logger
+ until it is really needed and delegates all calls to the real logger
+ once it is created
+ """
+ return LazyAdapter(name, version)
+
+
class WritableLogger(object):
"""A thin wrapper that responds to `write` and logs."""
self.logger.log(self.level, msg)
-class LegacyFormatter(logging.Formatter):
+class ContextFormatter(logging.Formatter):
"""A context.RequestContext aware formatter configured through flags.
The flags used to set format strings are: logging_context_format_string
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
- except LoopingCallDone, e:
+ except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.debug(_('Dynamic looping call sleeping for %.02f '
'seconds'), idle)
greenthread.sleep(idle)
- except LoopingCallDone, e:
+ except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2012 OpenStack LLC.
+# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
Network-related utilities and helper functions.
"""
-import logging
+from heat.openstack.common import log as logging
+
LOG = logging.getLogger(__name__)
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
- deprecated_name='list_notifier_drivers',
help='Driver or drivers to handle sending notifications'),
cfg.StrOpt('default_notification_level',
default='INFO',
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright (c) 2012 OpenStack, LLC.
+# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
"""
import abc
-import logging
import re
import urllib
+import six
import urllib2
from heat.openstack.common.gettextutils import _
from heat.openstack.common import jsonutils
+from heat.openstack.common import log as logging
LOG = logging.getLogger(__name__)
or_list.append(AndCheck(and_list))
# If we have only one check, omit the "or"
- if len(or_list) == 0:
+ if not or_list:
return FalseCheck()
elif len(or_list) == 1:
return or_list[0]
# TODO(termie): do dict inspection via dot syntax
match = self.match % target
if self.kind in creds:
- return match == unicode(creds[self.kind])
+ return match == six.text_type(creds[self.kind])
return False
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+System-level utilities and helper functions.
+"""
+
+import os
+import random
+import shlex
+import signal
+
+from eventlet.green import subprocess
+from eventlet import greenthread
+
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class InvalidArgumentError(Exception):
+ def __init__(self, message=None):
+ super(InvalidArgumentError, self).__init__(message)
+
+
+class UnknownArgumentError(Exception):
+ def __init__(self, message=None):
+ super(UnknownArgumentError, self).__init__(message)
+
+
+class ProcessExecutionError(Exception):
+ def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
+ description=None):
+ self.exit_code = exit_code
+ self.stderr = stderr
+ self.stdout = stdout
+ self.cmd = cmd
+ self.description = description
+
+ if description is None:
+ description = "Unexpected error while running command."
+ if exit_code is None:
+ exit_code = '-'
+ message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
+ % (description, cmd, exit_code, stdout, stderr))
+ super(ProcessExecutionError, self).__init__(message)
+
+
+class NoRootWrapSpecified(Exception):
+ def __init__(self, message=None):
+ super(NoRootWrapSpecified, self).__init__(message)
+
+
+def _subprocess_setup():
+ # Python installs a SIGPIPE handler by default. This is usually not what
+ # non-Python subprocesses expect.
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+
+def execute(*cmd, **kwargs):
+ """
+ Helper method to shell out and execute a command through subprocess with
+ optional retry.
+
+ :param cmd: Passed to subprocess.Popen.
+ :type cmd: string
+ :param process_input: Send to opened process.
+ :type proces_input: string
+ :param check_exit_code: Single bool, int, or list of allowed exit
+ codes. Defaults to [0]. Raise
+ :class:`ProcessExecutionError` unless
+ program exits with one of these code.
+ :type check_exit_code: boolean, int, or [int]
+ :param delay_on_retry: True | False. Defaults to True. If set to True,
+ wait a short amount of time before retrying.
+ :type delay_on_retry: boolean
+ :param attempts: How many times to retry cmd.
+ :type attempts: int
+ :param run_as_root: True | False. Defaults to False. If set to True,
+ the command is prefixed by the command specified
+ in the root_helper kwarg.
+ :type run_as_root: boolean
+ :param root_helper: command to prefix to commands called with
+ run_as_root=True
+ :type root_helper: string
+ :param shell: whether or not there should be a shell used to
+ execute this command. Defaults to false.
+ :type shell: boolean
+ :returns: (stdout, stderr) from process execution
+ :raises: :class:`UnknownArgumentError` on
+ receiving unknown arguments
+ :raises: :class:`ProcessExecutionError`
+ """
+
+ process_input = kwargs.pop('process_input', None)
+ check_exit_code = kwargs.pop('check_exit_code', [0])
+ ignore_exit_code = False
+ delay_on_retry = kwargs.pop('delay_on_retry', True)
+ attempts = kwargs.pop('attempts', 1)
+ run_as_root = kwargs.pop('run_as_root', False)
+ root_helper = kwargs.pop('root_helper', '')
+ shell = kwargs.pop('shell', False)
+
+ if isinstance(check_exit_code, bool):
+ ignore_exit_code = not check_exit_code
+ check_exit_code = [0]
+ elif isinstance(check_exit_code, int):
+ check_exit_code = [check_exit_code]
+
+ if kwargs:
+ raise UnknownArgumentError(_('Got unknown keyword args '
+ 'to utils.execute: %r') % kwargs)
+
+ if run_as_root and os.geteuid() != 0:
+ if not root_helper:
+ raise NoRootWrapSpecified(
+ message=('Command requested root, but did not specify a root '
+ 'helper.'))
+ cmd = shlex.split(root_helper) + list(cmd)
+
+ cmd = map(str, cmd)
+
+ while attempts > 0:
+ attempts -= 1
+ try:
+ LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
+ _PIPE = subprocess.PIPE # pylint: disable=E1101
+
+ if os.name == 'nt':
+ preexec_fn = None
+ close_fds = False
+ else:
+ preexec_fn = _subprocess_setup
+ close_fds = True
+
+ obj = subprocess.Popen(cmd,
+ stdin=_PIPE,
+ stdout=_PIPE,
+ stderr=_PIPE,
+ close_fds=close_fds,
+ preexec_fn=preexec_fn,
+ shell=shell)
+ result = None
+ if process_input is not None:
+ result = obj.communicate(process_input)
+ else:
+ result = obj.communicate()
+ obj.stdin.close() # pylint: disable=E1101
+ _returncode = obj.returncode # pylint: disable=E1101
+ if _returncode:
+ LOG.debug(_('Result was %s') % _returncode)
+ if not ignore_exit_code and _returncode not in check_exit_code:
+ (stdout, stderr) = result
+ raise ProcessExecutionError(exit_code=_returncode,
+ stdout=stdout,
+ stderr=stderr,
+ cmd=' '.join(cmd))
+ return result
+ except ProcessExecutionError:
+ if not attempts:
+ raise
+ else:
+ LOG.debug(_('%r failed. Retrying.'), cmd)
+ if delay_on_retry:
+ greenthread.sleep(random.randint(20, 200) / 100.0)
+ finally:
+ # NOTE(termie): this appears to be necessary to let the subprocess
+ # call clean something up in between calls, without
+ # it two execute calls in a row hangs the second one
+ greenthread.sleep(0)
+
+
+def trycmd(*args, **kwargs):
+ """
+ A wrapper around execute() to more easily handle warnings and errors.
+
+ Returns an (out, err) tuple of strings containing the output of
+ the command's stdout and stderr. If 'err' is not empty then the
+ command can be considered to have failed.
+
+ :discard_warnings True | False. Defaults to False. If set to True,
+ then for succeeding commands, stderr is cleared
+
+ """
+ discard_warnings = kwargs.pop('discard_warnings', False)
+
+ try:
+ out, err = execute(*args, **kwargs)
+ failed = False
+ except ProcessExecutionError, exn:
+ out, err = '', str(exn)
+ failed = True
+
+ if not failed and discard_warnings and err:
+ # Handle commands that output to stderr but otherwise succeed
+ err = ''
+
+ return out, err
+
+
+def ssh_execute(ssh, cmd, process_input=None,
+ addl_env=None, check_exit_code=True):
+ LOG.debug(_('Running cmd (SSH): %s'), cmd)
+ if addl_env:
+ raise InvalidArgumentError(_('Environment not supported over SSH'))
+
+ if process_input:
+ # This is (probably) fixable if we need it...
+ raise InvalidArgumentError(_('process_input not supported over SSH'))
+
+ stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
+ channel = stdout_stream.channel
+
+ # NOTE(justinsb): This seems suspicious...
+ # ...other SSH clients have buffering issues with this approach
+ stdout = stdout_stream.read()
+ stderr = stderr_stream.read()
+ stdin_stream.close()
+
+ exit_status = channel.recv_exit_status()
+
+ # exit_status == -1 if no exit code was returned
+ if exit_status != -1:
+ LOG.debug(_('Result was %s') % exit_status)
+ if check_exit_code and exit_status != 0:
+ raise ProcessExecutionError(exit_code=exit_status,
+ stdout=stdout,
+ stderr=stderr,
+ cmd=cmd)
+
+ return (stdout, stderr)
rpc.proxy
"""
+import inspect
+
from oslo.config import cfg
+from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
+from heat.openstack.common import local
+from heat.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
rpc_opts = [
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
-cfg.CONF.register_opts(rpc_opts)
+CONF = cfg.CONF
+CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
:returns: An instance of openstack.common.rpc.common.Connection
"""
- return _get_impl().create_connection(cfg.CONF, new=new)
+ return _get_impl().create_connection(CONF, new=new)
+
+
+def _check_for_lock():
+ if not CONF.debug:
+ return None
+
+ if ((hasattr(local.strong_store, 'locks_held')
+ and local.strong_store.locks_held)):
+ stack = ' :: '.join([frame[3] for frame in inspect.stack()])
+ LOG.warn(_('A RPC is being made while holding a lock. The locks '
+ 'currently held are %(locks)s. This is probably a bug. '
+ 'Please report it. Include the following: [%(stack)s].'),
+ {'locks': local.strong_store.locks_held,
+ 'stack': stack})
+ return True
+
+ return False
-def call(context, topic, msg, timeout=None):
+def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
+ :param check_for_lock: if True, a warning is emitted if a RPC call is made
+ with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
+ if check_for_lock:
+ _check_for_lock()
+ return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg):
:returns: None
"""
- return _get_impl().cast(cfg.CONF, context, topic, msg)
+ return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg):
:returns: None
"""
- return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
+ return _get_impl().fanout_cast(CONF, context, topic, msg)
-def multicall(context, topic, msg, timeout=None):
+def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
+ :param check_for_lock: if True, a warning is emitted if a RPC call is made
+ with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
+ if check_for_lock:
+ _check_for_lock()
+ return _get_impl().multicall(CONF, context, topic, msg, timeout)
def notify(context, topic, msg, envelope=False):
:returns: None
"""
- return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
+ return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg)
:returns: None
"""
- return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
+ return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg)
global _RPCIMPL
if _RPCIMPL is None:
try:
- _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ _RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
- impl = cfg.CONF.rpc_backend.replace('nova.rpc',
- 'nova.openstack.common.rpc')
+ impl = CONF.rpc_backend.replace('nova.rpc',
+ 'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL
AMQP, but is deprecated and predates this code.
"""
+import collections
import inspect
import sys
import uuid
from eventlet import greenpool
from eventlet import pools
+from eventlet import queue
from eventlet import semaphore
+# TODO(pekowsk): Remove import cfg and below comment in Havana.
+# This import should no longer be needed when the amqp_rpc_single_reply_queue
+# option is removed.
+from oslo.config import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common.rpc import common as rpc_common
+# TODO(pekowski): Remove this option in Havana.
+amqp_opts = [
+ cfg.BoolOpt('amqp_rpc_single_reply_queue',
+ default=False,
+ help='Enable a fast single reply queue if using AMQP based '
+ 'RPC like RabbitMQ or Qpid.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
+
+UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
+ self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self):
def empty(self):
while self.free_items:
self.get().close()
+ # Force a new connection pool to be created.
+ # Note that this was added due to failing unit test cases. The issue
+ # is the above "while loop" gets all the cached connections from the
+ # pool and closes them, but never returns them to the pool, a pool
+ # leak. The unit tests hang waiting for an item to be returned to the
+ # pool. The unit tests get here via the teatDown() method. In the run
+ # time code, it gets here via cleanup() and only appears in service.py
+ # just before doing a sys.exit(), so cleanup() only happens once and
+ # the leakage is not a problem.
+ self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ self.connection.join_consumer_pool(callback,
+ pool_name,
+ topic,
+ exchange_name)
+
def consume_in_thread(self):
self.connection.consume_in_thread()
raise rpc_common.InvalidRPCConnectionReuse()
-def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
- ending=False, log_failure=True):
+class ReplyProxy(ConnectionContext):
+ """ Connection class for RPC replies / callbacks """
+ def __init__(self, conf, connection_pool):
+ self._call_waiters = {}
+ self._num_call_waiters = 0
+ self._num_call_waiters_wrn_threshhold = 10
+ self._reply_q = 'reply_' + uuid.uuid4().hex
+ super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
+ self.declare_direct_consumer(self._reply_q, self._process_data)
+ self.consume_in_thread()
+
+ def _process_data(self, message_data):
+ msg_id = message_data.pop('_msg_id', None)
+ waiter = self._call_waiters.get(msg_id)
+ if not waiter:
+ LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
+ ', message : %(data)s'), {'msg_id': msg_id,
+ 'data': message_data})
+ else:
+ waiter.put(message_data)
+
+ def add_call_waiter(self, waiter, msg_id):
+ self._num_call_waiters += 1
+ if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+ LOG.warn(_('Number of call waiters is greater than warning '
+ 'threshhold: %d. There could be a MulticallProxyWaiter '
+ 'leak.') % self._num_call_waiters_wrn_threshhold)
+ self._num_call_waiters_wrn_threshhold *= 2
+ self._call_waiters[msg_id] = waiter
+
+ def del_call_waiter(self, msg_id):
+ self._num_call_waiters -= 1
+ del self._call_waiters[msg_id]
+
+ def get_reply_q(self):
+ return self._reply_q
+
+
+def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
+ failure=None, ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+ _add_unique_id(msg)
+ # If a reply_q exists, add the msg_id to the reply and pass the
+ # reply_q to direct_send() to use it as the response queue.
+ # Otherwise use the msg_id for backward compatibilty.
+ if reply_q:
+ msg['_msg_id'] = msg_id
+ conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
+ else:
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
+ self.reply_q = kwargs.pop('reply_q', None)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
+ values['reply_q'] = self.reply_q
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None, log_failure=True):
if self.msg_id:
- msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
- ending, log_failure)
+ msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
+ reply, failure, ending, log_failure)
if ending:
self.msg_id = None
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
msg.update(context_d)
-class ProxyCallback(object):
- """Calls methods on a proxy object based on method and args."""
+class _MsgIdCache(object):
+ """This class checks any duplicate messages."""
- def __init__(self, conf, proxy, connection_pool):
- self.proxy = proxy
+ # NOTE: This value is considered can be a configuration item, but
+ # it is not necessary to change its value in most cases,
+ # so let this value as static for now.
+ DUP_MSG_CHECK_SIZE = 16
+
+ def __init__(self, **kwargs):
+ self.prev_msgids = collections.deque([],
+ maxlen=self.DUP_MSG_CHECK_SIZE)
+
+ def check_duplicate_message(self, message_data):
+ """AMQP consumers may read same message twice when exceptions occur
+ before ack is returned. This method prevents doing it.
+ """
+ if UNIQUE_ID in message_data:
+ msg_id = message_data[UNIQUE_ID]
+ if msg_id not in self.prev_msgids:
+ self.prev_msgids.append(msg_id)
+ else:
+ raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+
+
+def _add_unique_id(msg):
+ """Add unique_id for checking duplicate messages."""
+ unique_id = uuid.uuid4().hex
+ msg.update({UNIQUE_ID: unique_id})
+ LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+
+
+class _ThreadPoolWithWait(object):
+ """Base class for a delayed invocation manager used by
+ the Connection class to start up green threads
+ to handle incoming messages.
+ """
+
+ def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
+
+class CallbackWrapper(_ThreadPoolWithWait):
+ """Wraps a straight callback to allow it to be invoked in a green
+ thread.
+ """
+
+ def __init__(self, conf, callback, connection_pool):
+ """
+ :param conf: cfg.CONF instance
+ :param callback: a callable (probably a function)
+ :param connection_pool: connection pool as returned by
+ get_connection_pool()
+ """
+ super(CallbackWrapper, self).__init__(
+ conf=conf,
+ connection_pool=connection_pool,
+ )
+ self.callback = callback
+
+ def __call__(self, message_data):
+ self.pool.spawn_n(self.callback, message_data)
+
+
+class ProxyCallback(_ThreadPoolWithWait):
+ """Calls methods on a proxy object based on method and args."""
+
+ def __init__(self, conf, proxy, connection_pool):
+ super(ProxyCallback, self).__init__(
+ conf=conf,
+ connection_pool=connection_pool,
+ )
+ self.proxy = proxy
+ self.msg_id_cache = _MsgIdCache()
+
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+ self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
- version = message_data.get('version', None)
+ version = message_data.get('version')
+ namespace = message_data.get('namespace')
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, method,
+ namespace, args)
- def _process_data(self, ctxt, version, method, args):
+ def _process_data(self, ctxt, version, method, namespace, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
"""
ctxt.update_store()
try:
- rval = self.proxy.dispatch(ctxt, version, method, **args)
+ rval = self.proxy.dispatch(ctxt, version, method, namespace,
+ **args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
- LOG.exception(_('Exception during message handling'))
- ctxt.reply(None, sys.exc_info(),
- connection_pool=self.connection_pool)
+ # sys.exc_info() is deleted by LOG.exception().
+ exc_info = sys.exc_info()
+ LOG.error(_('Exception during message handling'),
+ exc_info=exc_info)
+ ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
+
+
+class MulticallProxyWaiter(object):
+ def __init__(self, conf, msg_id, timeout, connection_pool):
+ self._msg_id = msg_id
+ self._timeout = timeout or conf.rpc_response_timeout
+ self._reply_proxy = connection_pool.reply_proxy
+ self._done = False
+ self._got_ending = False
+ self._conf = conf
+ self._dataqueue = queue.LightQueue()
+ # Add this caller to the reply proxy's call_waiters
+ self._reply_proxy.add_call_waiter(self, self._msg_id)
+ self.msg_id_cache = _MsgIdCache()
- def wait(self):
- """Wait for all callback threads to exit."""
- self.pool.waitall()
+ def put(self, data):
+ self._dataqueue.put(data)
+
+ def done(self):
+ if self._done:
+ return
+ self._done = True
+ # Remove this caller from reply proxy's call_waiters
+ self._reply_proxy.del_call_waiter(self._msg_id)
+ def _process_data(self, data):
+ result = None
+ self.msg_id_cache.check_duplicate_message(data)
+ if data['failure']:
+ failure = data['failure']
+ result = rpc_common.deserialize_remote_exception(self._conf,
+ failure)
+ elif data.get('ending', False):
+ self._got_ending = True
+ else:
+ result = data['result']
+ return result
+ def __iter__(self):
+ """Return a result until we get a reply with an 'ending" flag"""
+ if self._done:
+ raise StopIteration
+ while True:
+ try:
+ data = self._dataqueue.get(timeout=self._timeout)
+ result = self._process_data(data)
+ except queue.Empty:
+ self.done()
+ raise rpc_common.Timeout()
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.done()
+ if self._got_ending:
+ self.done()
+ raise StopIteration
+ if isinstance(result, Exception):
+ self.done()
+ raise result
+ yield result
+
+
+#TODO(pekowski): Remove MulticallWaiter() in Havana.
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
self._done = False
self._got_ending = False
self._conf = conf
+ self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
+ self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
return ConnectionContext(conf, connection_pool, pooled=not new)
+_reply_proxy_create_sem = semaphore.Semaphore()
+
+
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
+ # TODO(pekowski): Remove all these comments in Havana.
+ # For amqp_rpc_single_reply_queue = False,
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
+ # For amqp_rpc_single_reply_queue = True,
+ # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
+ _add_unique_id(msg)
pack_context(msg, context)
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ # TODO(pekowski): Remove this flag and the code under the if clause
+ # in Havana.
+ if not conf.amqp_rpc_single_reply_queue:
+ conn = ConnectionContext(conf, connection_pool)
+ wait_msg = MulticallWaiter(conf, conn, timeout)
+ conn.declare_direct_consumer(msg_id, wait_msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ else:
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
- msg = rpc_common.serialize_msg(msg, force_envelope=True)
+ msg = rpc_common.serialize_msg(msg)
conn.notify_send(topic, msg)
import traceback
from oslo.config import cfg
+import six
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
The current message format (version 2.0) is very simple. It is:
{
- 'heat.version': <RPC Envelope Version as a String>,
- 'heat.message': <Application Message Payload, JSON encoded>
+ 'oslo.version': <RPC Envelope Version as a String>,
+ 'oslo.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
'''
_RPC_ENVELOPE_VERSION = '2.0'
-_VERSION_KEY = 'heat.version'
-_MESSAGE_KEY = 'heat.message'
-
-
-# TODO(russellb) Turn this on after Grizzly.
-_SEND_RPC_ENVELOPE = False
+_VERSION_KEY = 'oslo.version'
+_MESSAGE_KEY = 'oslo.message'
class RPCException(Exception):
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
- message = _("Timeout while waiting on RPC response.")
+ message = _('Timeout while waiting on RPC response - '
+ 'topic: "%(topic)s", RPC method: "%(method)s" '
+ 'info: "%(info)s"')
+
+ def __init__(self, info=None, topic=None, method=None):
+ """
+ :param info: Extra info to convey to the user
+ :param topic: The topic that the rpc call was sent to
+ :param rpc_method_name: The name of the rpc method being
+ called
+ """
+ self.info = info
+ self.topic = topic
+ self.method = method
+ super(Timeout, self).__init__(
+ None,
+ info=info or _('<unknown>'),
+ topic=topic or _('<unknown>'),
+ method=method or _('<unknown>'))
+
+
+class DuplicateMessageError(RPCException):
+ message = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
"not supported by this endpoint.")
+class RpcVersionCapError(RPCException):
+ message = _("Specified RPC version cap, %(version_cap)s, is too low")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
"""
raise NotImplementedError()
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+
+ :param callback: Callable to be invoked for each message.
+ :type callback: callable accepting one argument
+ :param pool_name: The name of the consumer pool.
+ :type pool_name: str
+ :param topic: The routing topic for desired messages.
+ :type topic: str
+ :param exchange_name: The name of the message exchange where
+ the client should attach. Defaults to
+ the configured exchange.
+ :type exchange_name: str
+ """
+ raise NotImplementedError()
+
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
- except KeyError, e:
+ except KeyError as e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
- LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(_("Returning exception %s to caller"),
+ six.text_type(failure))
LOG.error(tb)
kwargs = {}
data = {
'class': str(failure.__class__.__name__),
'module': str(failure.__class__.__module__),
- 'message': unicode(failure),
+ 'message': six.text_type(failure),
'tb': tb,
'args': failure.args,
'kwargs': kwargs
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
- failure = klass(**failure.get('kwargs', {}))
+ failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return RemoteError(name, failure.get('message'), trace)
def catch_client_exception(exceptions, func, *args, **kwargs):
try:
return func(*args, **kwargs)
- except Exception, e:
+ except Exception as e:
if type(e) in exceptions:
raise ClientException()
else:
return True
-def serialize_msg(raw_msg, force_envelope=False):
- if not _SEND_RPC_ENVELOPE and not force_envelope:
- return raw_msg
-
+def serialize_msg(raw_msg):
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
"""
from heat.openstack.common.rpc import common as rpc_common
+from heat.openstack.common.rpc import serializer as rpc_serializer
class RpcDispatcher(object):
contains a list of underlying managers that have an API_VERSION attribute.
"""
- def __init__(self, callbacks):
+ def __init__(self, callbacks, serializer=None):
"""Initialize the rpc dispatcher.
:param callbacks: List of proxy objects that are an instance
of a class with rpc methods exposed. Each proxy
object should have an RPC_API_VERSION attribute.
+ :param serializer: The Serializer object that will be used to
+ deserialize arguments before the method call and
+ to serialize the result after it returns.
"""
self.callbacks = callbacks
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcDispatcher, self).__init__()
- def dispatch(self, ctxt, version, method, **kwargs):
+ def _deserialize_args(self, context, kwargs):
+ """Helper method called to deserialize args before dispatch.
+
+ This calls our serializer on each argument, returning a new set of
+ args that have been deserialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to be deserialized
+ :returns: A new set of deserialized args
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.deserialize_entity(context,
+ arg)
+ return new_kwargs
+
+ def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
:param ctxt: The request context
:param version: The requested API version from the incoming message
:param method: The method requested to be called by the incoming
message.
+ :param namespace: The namespace for the requested method. If None,
+ the dispatcher will look for a method on a callback
+ object with no namespace set.
:param kwargs: A dict of keyword arguments to be passed to the method.
:returns: Whatever is returned by the underlying method that gets
had_compatible = False
for proxyobj in self.callbacks:
- if hasattr(proxyobj, 'RPC_API_VERSION'):
+ # Check for namespace compatibility
+ try:
+ cb_namespace = proxyobj.RPC_API_NAMESPACE
+ except AttributeError:
+ cb_namespace = None
+
+ if namespace != cb_namespace:
+ continue
+
+ # Check for version compatibility
+ try:
rpc_api_version = proxyobj.RPC_API_VERSION
- else:
+ except AttributeError:
rpc_api_version = '1.0'
+
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible
+
if not hasattr(proxyobj, method):
continue
if is_compatible:
- return getattr(proxyobj, method)(ctxt, **kwargs)
+ kwargs = self._deserialize_args(ctxt, kwargs)
+ result = getattr(proxyobj, method)(ctxt, **kwargs)
+ return self.serializer.serialize_entity(ctxt, result)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC
+# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
self.topic = topic
self.proxy = proxy
- def call(self, context, version, method, args, timeout):
+ def call(self, context, version, method, namespace, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
- rval = self.proxy.dispatch(context, version, method, **args)
+ rval = self.proxy.dispatch(context, version, method,
+ namespace, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, version, method, args, timeout)
+ return consumer.call(context, version, method, namespace, args,
+ timeout)
def call(conf, context, topic, msg, timeout=None):
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
for consumer in CONSUMERS.get(topic, []):
try:
- consumer.call(context, version, method, args, None)
+ consumer.call(context, version, method, namespace, args, None)
except Exception:
pass
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC
+# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
- help='the RabbitMQ password'),
+ help='the RabbitMQ password',
+ secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
help='the RabbitMQ virtual host'),
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
- message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
+ finally:
+ message.ack()
self.queue.consume(*args, callback=_callback, **options)
"""Cancel the consuming from the queue, if it has started"""
try:
self.queue.cancel(self.tag)
- except KeyError, e:
+ except KeyError as e:
# NOTE(comstud): Kludge to get around a amqplib bug
if str(e) != "u'%s'" % self.tag:
raise
"""
# Default options
options = {'durable': False,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': False}
options.update(kwargs)
return
except (IOError, self.connection_errors) as e:
pass
- except Exception, e:
+ except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
while True:
try:
return method(*args, **kwargs)
- except (self.connection_errors, socket.timeout, IOError), e:
+ except (self.connection_errors, socket.timeout, IOError) as e:
if error_callback:
error_callback(e)
- except Exception, e:
+ except Exception as e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
def _error_callback(exc):
if isinstance(exc, socket.timeout):
- LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ LOG.debug(_('Timed out waiting for RPC response: %s') %
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
+ def join_consumer_pool(self, callback, pool_name, topic,
+ exchange_name=None):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+ """
+ callback_wrapper = rpc_amqp.CallbackWrapper(
+ conf=self.conf,
+ callback=callback,
+ connection_pool=rpc_amqp.get_connection_pool(self.conf,
+ Connection),
+ )
+ self.proxy_callbacks.append(callback_wrapper)
+ self.declare_topic_consumer(
+ queue_name=pool_name,
+ topic=topic,
+ exchange_name=exchange_name,
+ callback=callback_wrapper,
+ )
+
def create_connection(conf, new=True):
"""Create a connection"""
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC
+# Copyright 2011 OpenStack Foundation
# Copyright 2011 - 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
cfg.StrOpt('qpid_hostname',
default='localhost',
help='Qpid broker hostname'),
- cfg.StrOpt('qpid_port',
- default='5672',
+ cfg.IntOpt('qpid_port',
+ default=5672,
help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
- help='Password for qpid connection'),
+ help='Password for qpid connection',
+ secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
- self.connection.protocol = self.conf.qpid_protocol
+ self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
def _register_consumer(self, consumer):
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues"""
- if self.connection.opened():
- try:
- self.connection.close()
- except qpid_exceptions.ConnectionError:
- pass
-
attempt = 0
delay = 1
while True:
+ # Close the session if necessary
+ if self.connection.opened():
+ try:
+ self.connection.close()
+ except qpid_exceptions.ConnectionError:
+ pass
+
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
try:
self.connection_create(broker)
self.connection.open()
- except qpid_exceptions.ConnectionError, e:
+ except qpid_exceptions.ConnectionError as e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
try:
return method(*args, **kwargs)
except (qpid_exceptions.Empty,
- qpid_exceptions.ConnectionError), e:
+ qpid_exceptions.ConnectionError) as e:
if error_callback:
error_callback(e)
self.reconnect()
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
- LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ LOG.debug(_('Timed out waiting for RPC response: %s') %
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
return consumer
+ def join_consumer_pool(self, callback, pool_name, topic,
+ exchange_name=None):
+ """Register as a member of a group of consumers for a given topic from
+ the specified exchange.
+
+ Exactly one member of a given pool will receive each message.
+
+ A message will be delivered to multiple pools, if more than
+ one is created.
+ """
+ callback_wrapper = rpc_amqp.CallbackWrapper(
+ conf=self.conf,
+ callback=callback,
+ connection_pool=rpc_amqp.get_connection_pool(self.conf,
+ Connection),
+ )
+ self.proxy_callbacks.append(callback_wrapper)
+
+ consumer = TopicConsumer(conf=self.conf,
+ session=self.session,
+ topic=topic,
+ callback=callback_wrapper,
+ name=pool_name,
+ exchange_name=exchange_name)
+
+ self._register_consumer(consumer)
+ return consumer
+
def create_connection(conf, new=True):
"""Create a connection"""
import os
import pprint
+import re
import socket
import sys
import types
import greenlet
from oslo.config import cfg
+from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
Error if a developer passes us bad data.
"""
try:
- return str(jsonutils.dumps(data, ensure_ascii=True))
+ return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
- LOG.error(_("JSON serialization failed."))
- raise
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("JSON serialization failed."))
def _deserialize(data):
return
# We must unsubscribe, or we'll leak descriptors.
- if len(self.subscriptions) > 0:
+ if self.subscriptions:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
- if serialize:
- data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send([str(msg_id), str(topic), str('cast'),
- _serialize(data)])
+ def cast(self, msg_id, topic, data, envelope=False):
+ msg_id = msg_id or 0
+
+ if not envelope:
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'cast', _serialize(data))))
+ return
+
+ rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
+ zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
try:
result = proxy.dispatch(
- ctx, data['version'], data['method'], **data['args'])
+ ctx, data['version'], data['method'],
+ data.get('namespace'), **data['args'])
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
- except rpc_common.ClientException, e:
+ except rpc_common.ClientException as e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
def reply(self, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
- # Our real method is curried into msg['args']
+ # NOTE(ewindisch): context kwarg exists for Grizzly compat.
+ # this may be able to be removed earlier than
+ # 'I' if ConsumerBase.process were refactored.
+ if type(msg) is list:
+ payload = msg[-1]
+ else:
+ payload = msg
- child_ctx = RpcContext.unmarshal(msg[0])
response = ConsumerBase.normalize_reply(
- self._get_response(child_ctx, proxy, topic, msg[1]),
+ self._get_response(ctx, proxy, topic, payload),
ctx.replies)
LOG.debug(_("Sending reply"))
- cast(CONF, ctx, topic, {
+ _multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
- 'msg_id': msg_id,
+ 'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
- })
+ }, _msg_id=msg_id)
class ConsumerBase(object):
else:
return [result]
- def process(self, style, target, proxy, ctx, data):
+ def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
return
proxy.dispatch(ctx, data['version'],
- data['method'], **data['args'])
+ data['method'], data.get('namespace'), **data['args'])
class ZmqBaseReactor(ConsumerBase):
def __init__(self, conf):
super(ZmqProxy, self).__init__(conf)
+ pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
+ self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
self.topic_proxy = {}
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
- topic = topic.split('.', 1)[0]
+ topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- # Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
+ topic = topic.split('.', 1)[0]
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(_deserialize(in_msg))
- msg_id = inside[-1]['args']['msg_id']
- response = inside[-1]['args']['response']
- LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
LOG.info(_("Creating proxy for topic: %s"), topic)
try:
+ # The topic is received over the network,
+ # don't trust this input.
+ if self.badchars.search(topic) is not None:
+ emsg = _("Topic contained dangerous characters.")
+ LOG.warn(emsg)
+ raise RPCException(emsg)
+
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
- LOG.error(_("Could not create IPC directory %s") %
- (ipc_dir, ))
- raise
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
try:
self.register(consumption_proxy,
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
- LOG.error(_("Could not create ZeroMQ receiver daemon. "
- "Socket may already be in use."))
- raise
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
super(ZmqProxy, self).consume_in_thread()
+def unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
+
+
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ proxy = self.proxies[sock]
- ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
- ctx = RpcContext.unmarshal(ctx)
+ if data[2] == 'cast': # Legacy protocol
+ packenv = data[3]
- proxy = self.proxies[sock]
+ ctx, msg = _deserialize(packenv)
+ request = rpc_common.deserialize_msg(msg)
+ ctx = RpcContext.unmarshal(ctx)
+ elif data[2] == 'impl_zmq_v2':
+ packenv = data[4:]
+
+ msg = unflatten_envelope(packenv)
+ request = rpc_common.deserialize_msg(msg)
- self.pool.spawn_n(self.process, style, topic,
- proxy, ctx, request)
+ # Unmarshal only after verifying the message.
+ ctx = RpcContext.unmarshal(data[3])
+ else:
+ LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+ return
+
+ self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
+ self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
+ # Register with matchmaker.
+ _get_matchmaker().register(topic, CONF.rpc_zmq_host)
# Subscription scenarios
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
- topic = 'fanout~' + topic
+ subscribe = ('', fanout)[type(fanout) == str]
+ topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+ if topic in self.topics:
+ LOG.info(_("Skipping topic registration. Already registered."))
+ return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
+ self.topics.append(topic)
def close(self):
+ _get_matchmaker().stop_heartbeat()
+ for topic in self.topics:
+ _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
+
self.reactor.close()
+ self.topics = []
def wait(self):
self.reactor.wait()
def consume_in_thread(self):
+ _get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
+ _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None,
- serialize=True, force_envelope=False):
+def _call(addr, context, topic, msg, timeout=None,
+ envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
'method': '-reply',
'args': {
'msg_id': msg_id,
- 'context': mcontext,
'topic': reply_topic,
+ # TODO(ewindisch): safe to remove mcontext in I.
'msg': [mcontext, msg]
}
}
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies.%s" %
+ (CONF.rpc_zmq_ipc_dir,
+ CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload,
- serialize=serialize, force_envelope=force_envelope)
+ _cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+
+ if msg[2] == 'cast': # Legacy version
+ raw_msg = _deserialize(msg[-1])[-1]
+ elif msg[2] == 'impl_zmq_v2':
+ rpc_envelope = unflatten_envelope(msg[4:])
+ raw_msg = rpc_common.deserialize_msg(rpc_envelope)
+ else:
+ raise rpc_common.UnsupportedRpcEnvelopeVersion(
+ _("Unsupported or unknown ZMQ envelope returned."))
+
+ responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
+ except (IndexError, KeyError):
+ raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+def _multi_send(method, context, topic, msg, timeout=None,
+ envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
- if len(queues) == 0:
+ if not queues:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
- raise rpc_common.Timeout, "No match from matchmaker."
+ raise rpc_common.Timeout(_("No match from matchmaker."))
# This supports brokerless fanout (addresses > 1)
for queue in queues:
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize,
- force_envelope)
+ _topic, msg, timeout, envelope,
+ _msg_id)
return
- return method(_addr, context, _topic, _topic, msg, timeout,
- serialize, force_envelope)
+ return method(_addr, context, _topic, msg, timeout,
+ envelope)
def create_connection(conf, new=True):
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
-def notify(conf, context, topic, msg, **kwargs):
+def notify(conf, context, topic, msg, envelope):
"""
Send notification event.
Notifications are sent to topic-priority.
"""
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
- topic.replace('.', '-')
- kwargs['serialize'] = kwargs.pop('envelope')
- kwargs['force_envelope'] = True
- cast(conf, context, topic, msg, **kwargs)
+ topic = topic.replace('.', '-')
+ cast(conf, context, topic, msg, envelope=envelope)
def cleanup():
return ZMQ_CTX
-def _get_matchmaker():
+def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
- matchmaker = importutils.import_object(CONF.rpc_zmq_matchmaker)
+ mm = CONF.rpc_zmq_matchmaker
+ if mm.endswith('matchmaker.MatchMakerRing'):
+ mm.replace('matchmaker', 'matchmaker_ring')
+ LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
+ ' %(new)s instead') % dict(
+ orig=CONF.rpc_zmq_matchmaker, new=mm))
+ matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker
"""
import contextlib
-import itertools
-import json
+import eventlet
from oslo.config import cfg
from heat.openstack.common.gettextutils import _
matchmaker_opts = [
- # Matchmaker ring file
- cfg.StrOpt('matchmaker_ringfile',
- default='/etc/nova/matchmaker_ring.json',
- help='Matchmaker ring file (JSON)'),
+ cfg.IntOpt('matchmaker_heartbeat_freq',
+ default=300,
+ help='Heartbeat frequency'),
+ cfg.IntOpt('matchmaker_heartbeat_ttl',
+ default=600,
+ help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
class MatchMakerBase(object):
- """Match Maker Base Class."""
-
+ """
+ Match Maker Base Class.
+ Build off HeartbeatMatchMakerBase if building a
+ heartbeat-capable MatchMaker.
+ """
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
+ self.no_heartbeat_msg = _('Matchmaker does not implement '
+ 'registration or heartbeat.')
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ pass
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a key.host is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ pass
+
+ def is_alive(self, topic, host):
+ """
+ Checks if a host is alive.
+ """
+ pass
+
+ def expire(self, topic, host):
+ """
+ Explicitly expire a host's registration.
+ """
+ pass
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ pass
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ pass
+
+ def start_heartbeat(self):
+ """
+ Spawn heartbeat greenthread.
+ """
+ pass
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ pass
+
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
return workers
+class HeartbeatMatchMakerBase(MatchMakerBase):
+ """
+ Base for a heart-beat capable MatchMaker.
+ Provides common methods for registering,
+ unregistering, and maintaining heartbeats.
+ """
+ def __init__(self):
+ self.hosts = set()
+ self._heart = None
+ self.host_topic = {}
+
+ super(HeartbeatMatchMakerBase, self).__init__()
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ for key, host in self.host_topic:
+ self.ack_alive(key, host)
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a host.topic is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ raise NotImplementedError("Must implement ack_alive")
+
+ def backend_register(self, key, host):
+ """
+ Implements registration logic.
+ Called by register(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_register")
+
+ def backend_unregister(self, key, key_host):
+ """
+ Implements de-registration logic.
+ Called by unregister(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_unregister")
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ self.hosts.add(host)
+ self.host_topic[(key, host)] = host
+ key_host = '.'.join((key, host))
+
+ self.backend_register(key, key_host)
+
+ self.ack_alive(key, host)
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ if (key, host) in self.host_topic:
+ del self.host_topic[(key, host)]
+
+ self.hosts.discard(host)
+ self.backend_unregister(key, '.'.join((key, host)))
+
+ LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
+ {'key': key, 'host': host})
+
+ def start_heartbeat(self):
+ """
+ Implementation of MatchMakerBase.start_heartbeat
+ Launches greenthread looping send_heartbeats(),
+ yielding for CONF.matchmaker_heartbeat_freq seconds
+ between iterations.
+ """
+ if not self.hosts:
+ raise MatchMakerException(
+ _("Register before starting heartbeat."))
+
+ def do_heartbeat():
+ while True:
+ self.send_heartbeats()
+ eventlet.sleep(CONF.matchmaker_heartbeat_freq)
+
+ self._heart = eventlet.spawn(do_heartbeat)
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ if self._heart:
+ self._heart.kill()
+
+
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character
return [(key, None)]
-class RingExchange(Exchange):
- """
- Match Maker where hosts are loaded from a static file containing
- a hashmap (JSON formatted).
-
- __init__ takes optional ring dictionary argument, otherwise
- loads the ringfile from CONF.mathcmaker_ringfile.
- """
- def __init__(self, ring=None):
- super(RingExchange, self).__init__()
-
- if ring:
- self.ring = ring
- else:
- fh = open(CONF.matchmaker_ringfile, 'r')
- self.ring = json.load(fh)
- fh.close()
-
- self.ring0 = {}
- for k in self.ring.keys():
- self.ring0[k] = itertools.cycle(self.ring[k])
-
- def _ring_has(self, key):
- if key in self.ring0:
- return True
- return False
-
-
-class RoundRobinRingExchange(RingExchange):
- """A Topic Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(RoundRobinRingExchange, self).__init__(ring)
-
- def run(self, key):
- if not self._ring_has(key):
- LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (key, )
- )
- return []
- host = next(self.ring0[key])
- return [(key + '.' + host, host)]
-
-
-class FanoutRingExchange(RingExchange):
- """Fanout Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(FanoutRingExchange, self).__init__(ring)
-
- def run(self, key):
- # Assume starts with "fanout~", strip it for lookup.
- nkey = key.split('fanout~')[1:][0]
- if not self._ring_has(nkey):
- LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (nkey, )
- )
- return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
-
-
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
- def __init__(self):
+ def __init__(self, host='localhost'):
+ self.host = host
super(Exchange, self).__init__()
def run(self, key):
- return [(key.split('.')[0] + '.localhost', 'localhost')]
+ return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""
Exchange where all topic keys are split, sending to second half.
- i.e. "compute.host" sends a message to "compute" running on "host"
+ i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
- b, e = key.split('.', 1)
- return [(b, e)]
-
-
-class MatchMakerRing(MatchMakerBase):
- """
- Match Maker where hosts are loaded from a static hashmap.
- """
- def __init__(self, ring=None):
- super(MatchMakerRing, self).__init__()
- self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
- self.add_binding(DirectBinding(), DirectExchange())
- self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
+ e = key.split('.', 1)[1]
+ return [(key, e)]
class MatchMakerLocalhost(MatchMakerBase):
Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
- def __init__(self):
+ def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
- self.add_binding(FanoutBinding(), LocalhostExchange())
+ self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
- self.add_binding(TopicBinding(), LocalhostExchange())
+ self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should accept a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+from oslo.config import cfg
+
+from heat.openstack.common import importutils
+from heat.openstack.common import log as logging
+from heat.openstack.common.rpc import matchmaker as mm_common
+
+redis = importutils.try_import('redis')
+
+
+matchmaker_redis_opts = [
+ cfg.StrOpt('host',
+ default='127.0.0.1',
+ help='Host to locate redis'),
+ cfg.IntOpt('port',
+ default=6379,
+ help='Use this port to connect to redis host.'),
+ cfg.StrOpt('password',
+ default=None,
+ help='Password for Redis server. (optional)'),
+]
+
+CONF = cfg.CONF
+opt_group = cfg.OptGroup(name='matchmaker_redis',
+ title='Options for Redis-based MatchMaker')
+CONF.register_group(opt_group)
+CONF.register_opts(matchmaker_redis_opts, opt_group)
+LOG = logging.getLogger(__name__)
+
+
+class RedisExchange(mm_common.Exchange):
+ def __init__(self, matchmaker):
+ self.matchmaker = matchmaker
+ self.redis = matchmaker.redis
+ super(RedisExchange, self).__init__()
+
+
+class RedisTopicExchange(RedisExchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+ i.e. "compute.host" sends a message to "compute" running on "host"
+ """
+ def run(self, topic):
+ while True:
+ member_name = self.redis.srandmember(topic)
+
+ if not member_name:
+ # If this happens, there are no
+ # longer any members.
+ break
+
+ if not self.matchmaker.is_alive(topic, member_name):
+ continue
+
+ host = member_name.split('.', 1)[1]
+ return [(member_name, host)]
+ return []
+
+
+class RedisFanoutExchange(RedisExchange):
+ """
+ Return a list of all hosts.
+ """
+ def run(self, topic):
+ topic = topic.split('~', 1)[1]
+ hosts = self.redis.smembers(topic)
+ good_hosts = filter(
+ lambda host: self.matchmaker.is_alive(topic, host), hosts)
+
+ return [(x, x.split('.', 1)[1]) for x in good_hosts]
+
+
+class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
+ """
+ MatchMaker registering and looking-up hosts with a Redis server.
+ """
+ def __init__(self):
+ super(MatchMakerRedis, self).__init__()
+
+ if not redis:
+ raise ImportError("Failed to import module redis.")
+
+ self.redis = redis.StrictRedis(
+ host=CONF.matchmaker_redis.host,
+ port=CONF.matchmaker_redis.port,
+ password=CONF.matchmaker_redis.password)
+
+ self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
+ self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
+ self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
+
+ def ack_alive(self, key, host):
+ topic = "%s.%s" % (key, host)
+ if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
+ # If we could not update the expiration, the key
+ # might have been pruned. Re-register, creating a new
+ # key in Redis.
+ self.register(self.topic_host[host], host)
+
+ def is_alive(self, topic, host):
+ if self.redis.ttl(host) == -1:
+ self.expire(topic, host)
+ return False
+ return True
+
+ def expire(self, topic, host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.delete(host)
+ pipe.srem(topic, host)
+ pipe.execute()
+
+ def backend_register(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.sadd(key, key_host)
+
+ # No value is needed, we just
+ # care if it exists. Sets aren't viable
+ # because only keys can expire.
+ pipe.set(key_host, '')
+
+ pipe.execute()
+
+ def backend_unregister(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.srem(key, key_host)
+ pipe.delete(key_host)
+ pipe.execute()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011-2013 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should except a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+import itertools
+import json
+
+from oslo.config import cfg
+
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common import log as logging
+from heat.openstack.common.rpc import matchmaker as mm
+
+
+matchmaker_opts = [
+ # Matchmaker ring file
+ cfg.StrOpt('ringfile',
+ deprecated_name='matchmaker_ringfile',
+ deprecated_group='DEFAULT',
+ default='/etc/oslo/matchmaker_ring.json',
+ help='Matchmaker ring file (JSON)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
+LOG = logging.getLogger(__name__)
+
+
+class RingExchange(mm.Exchange):
+ """
+ Match Maker where hosts are loaded from a static file containing
+ a hashmap (JSON formatted).
+
+ __init__ takes optional ring dictionary argument, otherwise
+ loads the ringfile from CONF.mathcmaker_ringfile.
+ """
+ def __init__(self, ring=None):
+ super(RingExchange, self).__init__()
+
+ if ring:
+ self.ring = ring
+ else:
+ fh = open(CONF.matchmaker_ring.ringfile, 'r')
+ self.ring = json.load(fh)
+ fh.close()
+
+ self.ring0 = {}
+ for k in self.ring.keys():
+ self.ring0[k] = itertools.cycle(self.ring[k])
+
+ def _ring_has(self, key):
+ if key in self.ring0:
+ return True
+ return False
+
+
+class RoundRobinRingExchange(RingExchange):
+ """A Topic Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(RoundRobinRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ if not self._ring_has(key):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (key, )
+ )
+ return []
+ host = next(self.ring0[key])
+ return [(key + '.' + host, host)]
+
+
+class FanoutRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(FanoutRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "fanout~", strip it for lookup.
+ nkey = key.split('fanout~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
+class MatchMakerRing(mm.MatchMakerBase):
+ """
+ Match Maker where hosts are loaded from a static hashmap.
+ """
+ def __init__(self, ring=None):
+ super(MatchMakerRing, self).__init__()
+ self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
+ self.add_binding(mm.DirectBinding(), mm.DirectExchange())
+ self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2012 Red Hat, Inc.
+# Copyright 2012-2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
from heat.openstack.common import rpc
+from heat.openstack.common.rpc import common as rpc_common
+from heat.openstack.common.rpc import serializer as rpc_serializer
class RpcProxy(object):
rpc API.
"""
- def __init__(self, topic, default_version):
+ # The default namespace, which can be overriden in a subclass.
+ RPC_API_NAMESPACE = None
+
+ def __init__(self, topic, default_version, version_cap=None,
+ serializer=None):
"""Initialize an RpcProxy.
:param topic: The topic to use for all messages.
:param default_version: The default API version to request in all
outgoing messages. This can be overridden on a per-message
basis.
+ :param version_cap: Optionally cap the maximum version used for sent
+ messages.
+ :param serializer: Optionaly (de-)serialize entities with a
+ provided helper.
"""
self.topic = topic
self.default_version = default_version
+ self.version_cap = version_cap
+ if serializer is None:
+ serializer = rpc_serializer.NoOpSerializer()
+ self.serializer = serializer
super(RpcProxy, self).__init__()
def _set_version(self, msg, vers):
:param msg: The message having a version added to it.
:param vers: The version number to add to the message.
"""
- msg['version'] = vers if vers else self.default_version
+ v = vers if vers else self.default_version
+ if (self.version_cap and not
+ rpc_common.version_is_compatible(self.version_cap, v)):
+ raise rpc_common.RpcVersionCapError(version=self.version_cap)
+ msg['version'] = v
def _get_topic(self, topic):
"""Return the topic to use for a message."""
return topic if topic else self.topic
@staticmethod
- def make_msg(method, **kwargs):
- return {'method': method, 'args': kwargs}
+ def make_namespaced_msg(method, namespace, **kwargs):
+ return {'method': method, 'namespace': namespace, 'args': kwargs}
+
+ def make_msg(self, method, **kwargs):
+ return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
+ **kwargs)
+
+ def _serialize_msg_args(self, context, kwargs):
+ """Helper method called to serialize message arguments.
+
+ This calls our serializer on each argument, returning a new
+ set of args that have been serialized.
+
+ :param context: The request context
+ :param kwargs: The arguments to serialize
+ :returns: A new set of serialized arguments
+ """
+ new_kwargs = dict()
+ for argname, arg in kwargs.iteritems():
+ new_kwargs[argname] = self.serializer.serialize_entity(context,
+ arg)
+ return new_kwargs
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
- return rpc.call(context, self._get_topic(topic), msg, timeout)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
+ real_topic = self._get_topic(topic)
+ try:
+ result = rpc.call(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def multicall(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.multicall() a remote method.
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: An iterator that lets you process each of the returned values
from the remote method as they arrive.
"""
self._set_version(msg, version)
- return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
+ real_topic = self._get_topic(topic)
+ try:
+ result = rpc.multicall(context, real_topic, msg, timeout)
+ return self.serializer.deserialize_entity(context, result)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def cast(self, context, msg, topic=None, version=None):
"""rpc.cast() a remote method.
remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
from the remote method.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
return values.
"""
self._set_version(msg, version)
+ msg['args'] = self._serialize_msg_args(context, msg['args'])
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)
--- /dev/null
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Provides the definition of an RPC serialization handler"""
+
+import abc
+
+
+class Serializer(object):
+ """Generic (de-)serialization definition base class"""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def serialize_entity(self, context, entity):
+ """Serialize something to primitive form.
+
+ :param context: Security context
+ :param entity: Entity to be serialized
+ :returns: Serialized form of entity
+ """
+ pass
+
+ @abc.abstractmethod
+ def deserialize_entity(self, context, entity):
+ """Deserialize something from primitive form.
+
+ :param context: Security context
+ :param entity: Primitive to be deserialized
+ :returns: Deserialized form of entity
+ """
+ pass
+
+
+class NoOpSerializer(Serializer):
+ """A serializer that does nothing"""
+
+ def serialize_entity(self, context, entity):
+ return entity
+
+ def deserialize_entity(self, context, entity):
+ return entity
--- /dev/null
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+eventlet.monkey_patch()
+
+import contextlib
+import sys
+
+from oslo.config import cfg
+
+from heat.openstack.common import log as logging
+from heat.openstack.common import rpc
+from heat.openstack.common.rpc import impl_zmq
+
+CONF = cfg.CONF
+CONF.register_opts(rpc.rpc_opts)
+CONF.register_opts(impl_zmq.zmq_opts)
+
+
+def main():
+ CONF(sys.argv[1:], project='oslo')
+ logging.setup("oslo")
+
+ with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
+ reactor.consume_in_thread()
+ reactor.wait()
"""Generic Node base class for all workers that run on hosts."""
import errno
-import logging as std_logging
import os
import random
import signal
import time
import eventlet
+import logging as std_logging
from oslo.config import cfg
from heat.openstack.common import eventlet_backdoor
"""
self._services = threadgroup.ThreadGroup()
- eventlet_backdoor.initialize_if_enabled()
+ self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
:returns: None
"""
+ service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service)
def stop(self):
self.threads = []
self.timers = []
+ def add_dynamic_timer(self, callback, initial_delay=None,
+ periodic_interval_max=None, *args, **kwargs):
+ timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
+ timer.start(initial_delay=initial_delay,
+ periodic_interval_max=periodic_interval_max)
+ self.timers.append(timer)
+
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2011 OpenStack LLC.
+# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import iso8601
-TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
-PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
+# ISO 8601 extended time format with microseconds
+_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
+_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
+PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
-def isotime(at=None):
+def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format"""
if not at:
at = utcnow()
- str = at.strftime(TIME_FORMAT)
+ st = at.strftime(_ISO8601_TIME_FORMAT
+ if not subsecond
+ else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
- str += ('Z' if tz == 'UTC' else tz)
- return str
+ st += ('Z' if tz == 'UTC' else tz)
+ return st
def parse_isotime(timestr):
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
- return normalize_time(dt) < soon
+ return normalize_time(dt) <= soon
return self.call(ctxt, self.make_msg('show_watch',
watch_name=watch_name))
- def show_watch_metric(self, ctxt, namespace=None, metric_name=None):
+ def show_watch_metric(self, ctxt, metric_namespace=None, metric_name=None):
"""
The show_watch_metric method returns the datapoints associated
with a specified metric, or all metrics if no metric_name is passed
:param ctxt: RPC context.
- :param namespace: Name of the namespace you want to see,
+ :param metric_namespace: Name of the namespace you want to see,
or None to see all
:param metric_name: Name of the metric you want to see,
or None to see all
"""
return self.call(ctxt, self.make_msg('show_watch_metric',
- namespace=namespace,
+ metric_namespace=metric_namespace,
metric_name=metric_name))
def set_watch_state(self, ctxt, watch_name, state):
u'stack_status': u'CREATE_COMPLETE'}]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'list_stacks',
+ {'namespace': None,
+ 'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndReturn(engine_resp)
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'list_stacks',
+ {'namespace': None,
+ 'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("AttributeError"))
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'list_stacks',
+ {'namespace': None,
+ 'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("Exception"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'show_stack',
+ {'namespace': None,
+ 'method': 'show_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'show_stack',
+ {'namespace': None,
+ 'method': 'show_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'show_stack',
+ {'namespace': None,
+ 'method': 'show_stack',
'args': {'stack_identity': identity},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("InvalidTenant"))
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'show_stack',
+ {'namespace': None,
+ 'method': 'show_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("AttributeError"))
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'create_stack',
+ {'namespace': None,
+ 'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': engine_parms,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'create_stack',
+ {'namespace': None,
+ 'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': engine_parms,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'create_stack',
+ {'namespace': None,
+ 'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': engine_parms,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'create_stack',
+ {'namespace': None,
+ 'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': engine_parms,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'update_stack',
+ {'namespace': None,
+ 'method': 'update_stack',
'args': {'stack_identity': identity,
'template': template,
'params': engine_parms,
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'get_template',
+ {'namespace': None,
+ 'method': 'get_template',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'get_template',
+ {'namespace': None,
+ 'method': 'get_template',
'args': {'stack_identity': identity},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("AttributeError"))
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'get_template',
+ {'namespace': None,
+ 'method': 'get_template',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
# Engine returns None when delete successful
rpc.call(dummy_req.context, self.topic,
- {'method': 'delete_stack',
+ {'namespace': None,
+ 'method': 'delete_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(None)
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
rpc.call(dummy_req.context, self.topic,
- {'method': 'delete_stack',
+ {'namespace': None,
+ 'method': 'delete_stack',
'args': {'stack_identity': identity},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("AttributeError"))
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': identity},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("Exception"))
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
args = {
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': args,
'version': self.api_version}, None).AndReturn(engine_resp)
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
args = {
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': args,
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("ResourceNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
args = {
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
- {'method': 'describe_stack_resources',
+ {'namespace': None,
+ 'method': 'describe_stack_resources',
'args': args,
'version': self.api_version}, None).AndReturn(engine_resp)
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'find_physical_resource',
+ {'namespace': None,
+ 'method': 'find_physical_resource',
'args': {'physical_resource_id':
'a3455d8c-9f88-404d-a85b-5315293e67de'},
'version': self.api_version}, None).AndReturn(identity)
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
- {'method': 'describe_stack_resources',
+ {'namespace': None,
+ 'method': 'describe_stack_resources',
'args': args,
'version': self.api_version}, None).AndReturn(engine_resp)
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'find_physical_resource',
+ {'namespace': None,
+ 'method': 'find_physical_resource',
'args': {'physical_resource_id':
'aaaaaaaa-9f88-404d-cccc-ffffffffffff'},
'version': self.api_version},
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'list_stack_resources',
+ {'namespace': None,
+ 'method': 'list_stack_resources',
'args': {'stack_identity': identity},
'version': self.api_version}, None).AndReturn(engine_resp)
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version}, None
).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(dummy_req.context, self.topic,
- {'args': {'watch_name': watch_name},
+ {'namespace': None,
+ 'args': {'watch_name': watch_name},
'method': 'show_watch',
'version': self.api_version},
None).AndReturn(engine_resp)
# and pass None/None for namespace/watch_name which returns
# all metric data which we post-process in the API
rpc.call(dummy_req.context, self.topic,
- {'args': {'namespace': None, 'metric_name': None},
+ {'namespace': None,
+ 'args': {'metric_namespace': None, 'metric_name': None},
'method': 'show_watch_metric',
'version': self.api_version},
None).AndReturn(engine_resp)
# and pass None/None for namespace/watch_name which returns
# all metric data which we post-process in the API
rpc.call(dummy_req.context, self.topic, {'args':
- {'namespace': None,
- 'metric_name': None},
- 'method': 'show_watch_metric', 'version': self.api_version},
+ {'metric_namespace': None,
+ 'metric_name': None},
+ 'namespace': None,
+ 'method': 'show_watch_metric',
+ 'version': self.api_version},
None).AndReturn(engine_resp)
self.m.ReplayAll()
# Current engine implementation means we filter in the API
# and pass None/None for namespace/watch_name which returns
# all metric data which we post-process in the API
- rpc.call(dummy_req.context, self.topic, {'args':
- {'namespace': None,
- 'metric_name': None},
- 'method': 'show_watch_metric', 'version': self.api_version},
+ rpc.call(dummy_req.context, self.topic,
+ {'args': {'metric_namespace': None, 'metric_name': None},
+ 'namespace': None,
+ 'method': 'show_watch_metric',
+ 'version': self.api_version},
None).AndReturn(engine_resp)
self.m.ReplayAll()
'Unit': u'Count',
'Dimensions': []}},
'watch_name': u'HttpFailureAlarm'},
+ 'namespace': None,
'method': 'create_watch_data',
'version': self.api_version},
None).AndReturn(engine_resp)
{'args':
{'state': state_map[state],
'watch_name': u'HttpFailureAlarm'},
+ 'namespace': None,
'method': 'set_watch_state',
'version': self.api_version},
None).AndReturn(engine_resp)
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_stacks',
+ {'namespace': None,
+ 'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndReturn(engine_resp)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_stacks',
+ {'namespace': None,
+ 'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_stacks',
+ {'namespace': None,
+ 'method': 'list_stacks',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("Exception"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'create_stack',
+ {'namespace': None,
+ 'method': 'create_stack',
'args': {'stack_name': identity.stack_name,
'template': template,
'params': parameters,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'create_stack',
+ {'namespace': None,
+ 'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': parameters,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'create_stack',
+ {'namespace': None,
+ 'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': parameters,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'create_stack',
+ {'namespace': None,
+ 'method': 'create_stack',
'args': {'stack_name': stack_name,
'template': template,
'params': parameters,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': identity.stack_name},
'version': self.api_version},
None).AndReturn(identity)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': identity.stack_name},
'version': self.api_version},
None).AndReturn(identity)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'identify_stack',
+ {'namespace': None,
+ 'method': 'identify_stack',
'args': {'stack_name': stack_name},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'show_stack',
+ {'namespace': None,
+ 'method': 'show_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndReturn(engine_resp)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'show_stack',
+ {'namespace': None,
+ 'method': 'show_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'show_stack',
+ {'namespace': None,
+ 'method': 'show_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("InvalidTenant"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'get_template',
+ {'namespace': None,
+ 'method': 'get_template',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndReturn(template)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'get_template',
+ {'namespace': None,
+ 'method': 'get_template',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'update_stack',
+ {'namespace': None,
+ 'method': 'update_stack',
'args': {'stack_identity': dict(identity),
'template': template,
'params': parameters,
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'update_stack',
+ {'namespace': None,
+ 'method': 'update_stack',
'args': {'stack_identity': dict(identity),
'template': template,
'params': parameters,
self.m.StubOutWithMock(rpc, 'call')
# Engine returns None when delete successful
rpc.call(req.context, self.topic,
- {'method': 'delete_stack',
+ {'namespace': None,
+ 'method': 'delete_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndReturn(None)
self.m.StubOutWithMock(rpc, 'call')
# Engine returns None when delete successful
rpc.call(req.context, self.topic,
- {'method': 'delete_stack',
+ {'namespace': None,
+ 'method': 'delete_stack',
'args': {'stack_identity': dict(identity)},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'validate_template',
+ {'namespace': None,
+ 'method': 'validate_template',
'args': {'template': template},
'version': self.api_version},
None).AndReturn(engine_response)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'validate_template',
+ {'namespace': None,
+ 'method': 'validate_template',
'args': {'template': template},
'version': self.api_version},
None).AndReturn({'Error': 'fubar'})
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_resource_types',
+ {'namespace': None,
+ 'method': 'list_resource_types',
'args': {},
'version': self.api_version},
None).AndReturn(engine_response)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_resource_types',
+ {'namespace': None,
+ 'method': 'list_resource_types',
'args': {},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("ValueError"))
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_stack_resources',
+ {'namespace': None,
+ 'method': 'list_stack_resources',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_stack_resources',
+ {'namespace': None,
+ 'method': 'list_stack_resources',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
}
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
}
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'describe_stack_resource',
+ {'namespace': None,
+ 'method': 'describe_stack_resource',
'args': {'stack_identity': stack_identity,
'resource_name': res_name},
'version': self.api_version},
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
]
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndReturn(engine_resp)
self.m.StubOutWithMock(rpc, 'call')
rpc.call(req.context, self.topic,
- {'method': 'list_events',
+ {'namespace': None,
+ 'method': 'list_events',
'args': {'stack_identity': stack_identity},
'version': self.api_version},
None).AndRaise(rpc_common.RemoteError("StackNotFound"))
watch = db_api.watch_data_create(self.ctx, values)
# Check there is one result returned
- result = self.man.show_watch_metric(self.ctx, namespace=None,
+ result = self.man.show_watch_metric(self.ctx,
+ metric_namespace=None,
metric_name=None)
self.assertEqual(1, len(result))
# Create another metric datapoint and check we get two
watch = db_api.watch_data_create(self.ctx, values)
- result = self.man.show_watch_metric(self.ctx, namespace=None,
+ result = self.man.show_watch_metric(self.ctx, metric_namespace=None,
metric_name=None)
self.assertEqual(2, len(result))
setup_dummy_db()
self.ctx = context.get_admin_context()
- self.m.StubOutWithMock(self.ctx, 'username')
- self.ctx.username = self.username
+ self.m.StubOutWithMock(self.ctx, 'user')
+ self.ctx.user = self.username
self.ctx.tenant_id = 'test_tenant'
generic_rsrc.GenericResource.properties_schema = {}
def test_show_watch_metric(self):
self._test_engine_api('show_watch_metric', 'call',
- namespace=None, metric_name=None)
+ metric_namespace=None, metric_name=None)
def test_set_watch_state(self):
self._test_engine_api('set_watch_state', 'call',
kombu>=1.0.4
argparse
lxml>=2.3,<=2.3.5
+six
sqlalchemy-migrate>=0.7.2
python-novaclient>=2.11.0,<3
PasteDeploy==1.5.0