from quantum.common import utils
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
from quantum.version import version_info as quantum_version
cfg.IntOpt('max_subnet_host_routes', default=20),
cfg.IntOpt('dhcp_lease_duration', default=120),
cfg.BoolOpt('allow_overlapping_ips', default=False),
- cfg.StrOpt('control_exchange',
- default='quantum',
- help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('host', default=utils.get_hostname()),
cfg.BoolOpt('force_gateway_on_subnet', default=False,
help=_("Ensure that configured gateway is on subnet")),
def parse(args):
+ rpc.set_defaults(control_exchange='quantum')
cfg.CONF(args=args, project='quantum',
version='%%prog %s' % quantum_version.version_string_with_vcs())
def _print_greenthreads():
- for i, gt in enumerate(find_objects(greenlet.greenlet)):
+ for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt
traceback.print_stack(gt.gr_frame)
print
}
if CONF.backdoor_port is None:
- return
+ return None
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
pprint.pprint(val)
sys.displayhook = displayhook
- eventlet.spawn_n(eventlet.backdoor.backdoor_server,
- eventlet.listen(('localhost', CONF.backdoor_port)),
+ sock = eventlet.listen(('localhost', CONF.backdoor_port))
+ port = sock.getsockname()[1]
+ eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
+ return port
import logging
+from quantum.openstack.common.gettextutils import _
+
class Error(Exception):
def __init__(self, message=None):
except Exception, e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
- logging.exception('Uncaught exception')
+ logging.exception(_('Uncaught exception'))
#logging.error(traceback.extract_stack(exc_traceback))
raise Error(str(e))
raise
import sys
import traceback
+from quantum.openstack.common.gettextutils import _
+
@contextlib.contextmanager
def save_and_reraise_exception():
try:
yield
except Exception:
- logging.error('Original exception being dropped: %s' %
- (traceback.format_exception(type_, value, tb)))
+ logging.error(_('Original exception being dropped: %s'),
+ traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb
try:
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
- except (ValueError, AttributeError), exc:
+ except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' %
(class_str,
traceback.format_exception(*sys.exc_info())))
level=level + 1)
else:
return value
- except TypeError, e:
+ except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
return unicode(value)
from quantum.openstack.common import cfg
from quantum.openstack.common import fileutils
+from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
log_opts = [
cfg.StrOpt('logging_context_format_string',
- default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
- '%(user_id)s %(project_id)s] %(instance)s'
+ default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
+ '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
- default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
- ' %(instance)s%(message)s',
+ default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
+ '%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
- default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
+ default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
+ '%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[
self.log(logging.AUDIT, msg, *args, **kwargs)
def deprecated(self, msg, *args, **kwargs):
- stdmsg = _("Deprecated Config: %s") % msg
+ stdmsg = _("Deprecated: %s") % msg
if CONF.fatal_deprecations:
self.critical(stdmsg, *args, **kwargs)
raise DeprecatedConfig(msg=stdmsg)
_setup_logging_from_conf(product_name)
+def set_defaults(logging_context_format_string):
+ cfg.set_defaults(log_opts,
+ logging_context_format_string=
+ logging_context_format_string)
+
+
def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler,
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
+from quantum.openstack.common import timeutils
LOG = logging.getLogger(__name__)
try:
while self._running:
+ start = timeutils.utcnow()
self.f(*self.args, **self.kw)
+ end = timeutils.utcnow()
if not self._running:
break
- greenthread.sleep(interval)
+ delay = interval - timeutils.delta_seconds(start, end)
+ if delay <= 0:
+ LOG.warn(_('task run outlasted interval by %s sec') %
+ -delay)
+ greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone, e:
self.stop()
done.send(e.retvalue)
for driver in _get_drivers():
try:
driver.notify(context, msg)
- except Exception, e:
+ except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. "
- "Payload=%(payload)s") % locals())
+ "Payload=%(payload)s")
+ % dict(e=e, payload=payload))
_drivers = None
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
- except ImportError as e:
+ except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
topic = '%s.%s' % (topic, priority)
try:
rpc.notify(context, topic, message)
- except Exception, e:
+ except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())
--- /dev/null
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''messaging based notification driver, with message envelopes'''
+
+from quantum.openstack.common import cfg
+from quantum.openstack.common import context as req_context
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+
+LOG = logging.getLogger(__name__)
+
+notification_topic_opt = cfg.ListOpt(
+ 'topics', default=['notifications', ],
+ help='AMQP topic(s) used for openstack notifications')
+
+opt_group = cfg.OptGroup(name='rpc_notifier2',
+ title='Options for rpc_notifier2')
+
+CONF = cfg.CONF
+CONF.register_group(opt_group)
+CONF.register_opt(notification_topic_opt, opt_group)
+
+
+def notify(context, message):
+ """Sends a notification via RPC"""
+ if not context:
+ context = req_context.get_admin_context()
+ priority = message.get('priority',
+ CONF.default_notification_level)
+ priority = priority.lower()
+ for topic in CONF.rpc_notifier2.topics:
+ topic = '%s.%s' % (topic, priority)
+ try:
+ rpc.notify(context, topic, message, envelope=True)
+ except Exception:
+ LOG.exception(_("Could not send notification to %(topic)s. "
+ "Payload=%(message)s"), locals())
ticks_to_skip = self._ticks_to_skip[task_name]
if ticks_to_skip > 0:
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
- " ticks left until next run"), locals())
+ " ticks left until next run"),
+ dict(full_task_name=full_task_name,
+ ticks_to_skip=ticks_to_skip))
self._ticks_to_skip[task_name] -= 1
continue
self._ticks_to_skip[task_name] = task._ticks_between_runs
- LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+ LOG.debug(_("Running periodic task %(full_task_name)s"),
+ dict(full_task_name=full_task_name))
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
- LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
- locals())
+ LOG.exception(_("Error during %(full_task_name)s:"
+ " %(e)s"),
+ dict(e=e, full_task_name=full_task_name))
default=['quantum.openstack.common.exception',
'nova.exception',
'cinder.exception',
+ 'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
- #
- # The following options are not registered here, but are expected to be
- # present. The project using this library must register these options with
- # the configuration so that project-specific defaults may be defined.
- #
- #cfg.StrOpt('control_exchange',
- # default='nova',
- # help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ cfg.StrOpt('control_exchange',
+ default='openstack',
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
+def set_defaults(control_exchange):
+ cfg.set_defaults(rpc_opts,
+ control_exchange=control_exchange)
+
+
def create_connection(new=True):
"""Create a connection to the message bus used for rpc.
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
+ :param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
- return _get_impl().notify(cfg.CONF, context, topic, msg)
+ return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():
"""
import inspect
-import logging
import sys
import uuid
from eventlet import pools
from eventlet import semaphore
-from quantum.openstack.common import cfg
from quantum.openstack.common import excutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local
+from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import common as rpc_common
# TODO(comstud): Timeout connections not used in a while
def create(self):
- LOG.debug('Pool creating new connection')
+ LOG.debug(_('Pool creating new connection'))
return self.connection_cls(self.conf)
def empty(self):
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
- ending=False):
+ ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
with ConnectionContext(conf, connection_pool) as conn:
if failure:
- failure = rpc_common.serialize_remote_exception(failure)
+ failure = rpc_common.serialize_remote_exception(failure,
+ log_failure)
try:
msg = {'result': reply, 'failure': failure}
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, msg)
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
- connection_pool=None):
+ connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
- ending)
+ ending, log_failure)
if ending:
self.msg_id = None
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
- except Exception as e:
- LOG.exception('Exception during message handling')
+ except rpc_common.ClientException as e:
+ LOG.debug(_('Expected exception during message handling (%s)') %
+ e._exc_info[1])
+ ctxt.reply(None, e._exc_info,
+ connection_pool=self.connection_pool,
+ log_failure=False)
+ except Exception:
+ LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
- LOG.debug(_('Making asynchronous call on %s ...'), topic)
+ LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
- event_type = msg.get('event_type')
- LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
+ LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+ dict(event_type=msg.get('event_type'),
+ topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
+ if envelope:
+ msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
def get_control_exchange(conf):
- try:
- return conf.control_exchange
- except cfg.NoSuchOptError:
- return 'openstack'
+ return conf.control_exchange
# under the License.
import copy
-import logging
+import sys
import traceback
+from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import local
+from quantum.openstack.common import log as logging
+CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently. For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc. This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer. See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple. It is:
+
+ {
+ 'quantum.version': <RPC Envelope Version as a String>,
+ 'quantum.message': <Application Message Payload, JSON encoded>
+ }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version. It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload. The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'quantum.version'
+_MESSAGE_KEY = 'quantum.message'
+
+
+# TODO(russellb) Turn this on after Grizzly.
+_SEND_RPC_ENVELOPE = False
+
+
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
try:
message = self.message % kwargs
- except Exception as e:
+ except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
"this endpoint.")
+class UnsupportedRpcEnvelopeVersion(RPCException):
+ message = _("Specified RPC envelope version, %(version)s, "
+ "not supported by this endpoint.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {'set_admin_password': ('new_pass',),
- 'run_instance': ('admin_password',), }
+ SANITIZE = {'set_admin_password': [('args', 'new_pass')],
+ 'run_instance': [('args', 'admin_password')],
+ 'route_message': [('args', 'message', 'args', 'method_info',
+ 'method_kwargs', 'password'),
+ ('args', 'message', 'args', 'method_info',
+ 'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
msg_data = copy.deepcopy(msg_data)
if has_method:
- method = msg_data['method']
- if method in SANITIZE:
- args_to_sanitize = SANITIZE[method]
- for arg in args_to_sanitize:
- try:
- msg_data['args'][arg] = "<SANITIZED>"
- except KeyError:
- pass
+ for arg in SANITIZE.get(msg_data['method'], []):
+ try:
+ d = msg_data
+ for elem in arg[:-1]:
+ d = d[elem]
+ d[arg[-1]] = '<SANITIZED>'
+ except KeyError, e:
+ LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
+ {'item': arg,
+ 'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data)
-def serialize_remote_exception(failure_info):
+def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple.
"""
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
- LOG.error(_("Returning exception %s to caller"), unicode(failure))
- LOG.error(tb)
+ if log_failure:
+ LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(tb)
kwargs = {}
if hasattr(failure, 'kwargs'):
# we cannot necessarily change an exception message so we must override
# the __str__ method.
failure.__class__ = new_ex_type
- except TypeError as e:
+ except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument.
failure.args = (message,) + failure.args[1:]
context.values['read_deleted'] = read_deleted
return context
+
+
+class ClientException(Exception):
+ """This encapsulates some actual exception that is expected to be
+ hit by an RPC proxy object. Merely instantiating it records the
+ current exception information, which will be passed back to the
+ RPC client without exceptional logging."""
+ def __init__(self):
+ self._exc_info = sys.exc_info()
+
+
+def catch_client_exception(exceptions, func, *args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception, e:
+ if type(e) in exceptions:
+ raise ClientException()
+ else:
+ raise
+
+
+def client_exceptions(*exceptions):
+ """Decorator for manager methods that raise expected exceptions.
+ Marking a Manager method with this decorator allows the declaration
+ of expected exceptions that the RPC layer should not consider fatal,
+ and not log as if they were generated in a real error scenario. Note
+ that this will cause listed exceptions to be wrapped in a
+ ClientException, which is used internally by the RPC layer."""
+ def outer(func):
+ def inner(*args, **kwargs):
+ return catch_client_exception(exceptions, func, *args, **kwargs)
+ return inner
+ return outer
+
+
+def version_is_compatible(imp_version, version):
+ """Determine whether versions are compatible.
+
+ :param imp_version: The version implemented
+ :param version: The version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ imp_version_parts = imp_version.split('.')
+ if int(version_parts[0]) != int(imp_version_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
+ return False
+ return True
+
+
+def serialize_msg(raw_msg, force_envelope=False):
+ if not _SEND_RPC_ENVELOPE and not force_envelope:
+ return raw_msg
+
+ # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+ # information about this format.
+ msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+ return msg
+
+
+def deserialize_msg(msg):
+ # NOTE(russellb): Hang on to your hats, this road is about to
+ # get a little bumpy.
+ #
+ # Robustness Principle:
+ # "Be strict in what you send, liberal in what you accept."
+ #
+ # At this point we have to do a bit of guessing about what it
+ # is we just received. Here is the set of possibilities:
+ #
+ # 1) We received a dict. This could be 2 things:
+ #
+ # a) Inspect it to see if it looks like a standard message envelope.
+ # If so, great!
+ #
+ # b) If it doesn't look like a standard message envelope, it could either
+ # be a notification, or a message from before we added a message
+ # envelope (referred to as version 1.0).
+ # Just return the message as-is.
+ #
+ # 2) It's any other non-dict type. Just return it and hope for the best.
+ # This case covers return values from rpc.call() from before message
+ # envelopes were used. (messages to call a method were always a dict)
+
+ if not isinstance(msg, dict):
+ # See #2 above.
+ return msg
+
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+ if not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ return msg
+
+ # At this point we think we have the message envelope
+ # format we were expecting. (#1.a above)
+
+ if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+ raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+ return raw_msg
there can be both versioned and unversioned APIs implemented in the same code
base.
-
-EXAMPLES:
+EXAMPLES
+========
Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server
Example 1) Adding a new method.
+-------------------------------
Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must
-be implemented for the method to be supported. For example:
+be implemented for the method to be supported. For example::
def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None)
Example 2) Adding a new parameter.
+----------------------------------
Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
-The implementation of the method must not expect the parameter to be present.
+The implementation of the method must not expect the parameter to be present.::
def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- @staticmethod
- def _is_compatible(mversion, version):
- """Determine whether versions are compatible.
-
- :param mversion: The API version implemented by a callback.
- :param version: The API version requested by an incoming message.
- """
- version_parts = version.split('.')
- mversion_parts = mversion.split('.')
- if int(version_parts[0]) != int(mversion_parts[0]): # Major
- return False
- if int(version_parts[1]) > int(mversion_parts[1]): # Minor
- return False
- return True
-
def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version.
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
- is_compatible = self._is_compatible(rpc_api_version, version)
+ is_compatible = rpc_common.version_is_compatible(rpc_api_version,
+ version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
"""
import inspect
+# NOTE(russellb): We specifically want to use json, not our own jsonutils.
+# jsonutils has some extra logic to automatically convert objects to primitive
+# types so that they can be serialized. We want to catch all cases where
+# non-primitive types make it into this code and treat it as an error.
+import json
import time
import eventlet
-from quantum.openstack.common import jsonutils
from quantum.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
else:
res.append(rval)
done.send(res)
+ except rpc_common.ClientException as e:
+ done.send_exception(e._exc_info[1])
except Exception as e:
done.send_exception(e)
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
- jsonutils.dumps(msg)
+ json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None):
def cast(conf, context, topic, msg):
+ check_serialize(msg)
try:
call(conf, context, topic, msg)
except Exception:
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- callback(message.payload)
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
# Default options
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'],
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
"""
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params)
try:
- self.connection.close()
+ self.connection.release()
except self.connection_errors:
pass
# Setting this in case the next statement fails, though
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
import functools
import itertools
-import logging
import time
import uuid
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
+ cfg.ListOpt('qpid_hosts',
+ default=['$qpid_hostname:$qpid_port'],
+ help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
- self.callback(message.content)
+ msg = rpc_common.deserialize_msg(message.content)
+ self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
self.session = None
self.consumers = {}
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
+ if server_params and 'hostname' in server_params:
+ # NOTE(russellb) This enables support for cast_to_server.
+ server_params['qpid_hosts'] = [
+ '%s:%d' % (server_params['hostname'],
+ server_params.get('port', 5672))
+ ]
+
params = {
- 'hostname': self.conf.qpid_hostname,
- 'port': self.conf.qpid_port,
+ 'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
}
params.update(server_params or {})
- self.broker = params['hostname'] + ":" + str(params['port'])
+ self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
- self.connection_create()
+ self.connection_create(self.brokers[0])
self.reconnect()
- def connection_create(self):
+ def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(self.broker)
+ self.connection = qpid.messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
except qpid.messaging.exceptions.ConnectionError:
pass
+ attempt = 0
delay = 1
while True:
+ broker = self.brokers[attempt % len(self.brokers)]
+ attempt += 1
+
try:
- self.connection_create()
+ self.connection_create(broker)
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
time.sleep(delay)
delay = min(2 * delay, 60)
else:
+ LOG.info(_('Connected to AMQP server on %s'), broker)
break
- LOG.info(_('Connected to AMQP server on %s'), self.broker)
-
self.session = self.connection.session()
if self.consumers:
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data):
+ def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ if serialize:
+ data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
try:
result = proxy.dispatch(
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
+ except rpc_common.ClientException, e:
+ LOG.debug(_("Expected exception during message handling (%s)") %
+ e._exc_info[1])
+ return {'exc':
+ rpc_common.serialize_remote_exception(e._exc_info,
+ log_failure=False)}
except Exception:
+ LOG.error(_("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
return
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = _deserialize(in_msg)
+ inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
msg_id, topic, style, in_msg = data
- ctx, request = _deserialize(in_msg)
+ ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload)
+ conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout)
+ _topic, _topic, msg, timeout, serialize,
+ force_envelope)
return
return method(_addr, context, _topic, _topic, msg, timeout)
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
+ kwargs['serialize'] = kwargs.pop('envelope')
+ kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)
import contextlib
import itertools
import json
-import logging
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
matchmaker_opts = [
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
+ # Hook to allow the manager to do other initializations after
+ # the rpc connection is created.
+ if callable(getattr(self.manager, 'initialize_service_hook', None)):
+ self.manager.initialize_service_hook(self)
+
# Consume from all consumers in a thread
self.conn.consume_in_thread()
import time
import eventlet
-import greenlet
+import extras
import logging as std_logging
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum.openstack.common import threadgroup
-try:
- from quantum.openstack.common import rpc
-except ImportError:
- rpc = None
+rpc = extras.try_import('quantum.openstack.common.rpc')
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
:returns: None
"""
- self._services = []
+ self._services = threadgroup.ThreadGroup('launcher')
eventlet_backdoor.initialize_if_enabled()
@staticmethod
:returns: None
"""
- gt = eventlet.spawn(self.run_service, service)
- self._services.append(gt)
+ self._services.add_thread(self.run_service, service)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
- for service in self._services:
- service.kill()
+ self._services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
- for service in self._services:
- try:
- service.wait()
- except greenlet.GreenletExit:
- pass
+ self._services.wait()
class SignalExit(SystemExit):
except SystemExit as exc:
status = exc.code
finally:
- self.stop()
if rpc:
rpc.cleanup()
+ self.stop()
return status
def _wait_child(self):
try:
- pid, status = os.wait()
+ # Don't block if no child processes have exited
+ pid, status = os.waitpid(0, os.WNOHANG)
+ if not pid:
+ return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
- LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
+ dict(pid=pid, sig=sig))
else:
code = os.WEXITSTATUS(status)
- LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
+ LOG.info(_('Child %(pid)s exited with status %(code)d'),
+ dict(pid=pid, code=code))
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
while self.running:
wrap = self._wait_child()
if not wrap:
+ # Yield to other threads if no children have exited
+ # Sleep for a short time to avoid excessive CPU usage
+ # (see bug #1095346)
+ eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
class Service(object):
"""Service object for binaries running on hosts."""
- def __init__(self):
- self.tg = threadgroup.ThreadGroup('service')
+ def __init__(self, threads=1000):
+ self.tg = threadgroup.ThreadGroup('service', threads)
def start(self):
pass
def _run_shell_command(cmd):
- output = subprocess.Popen(["/bin/sh", "-c", cmd],
- stdout=subprocess.PIPE)
+ if os.name == 'nt':
+ output = subprocess.Popen(["cmd.exe", "/C", cmd],
+ stdout=subprocess.PIPE)
+ else:
+ output = subprocess.Popen(["/bin/sh", "-c", cmd],
+ stdout=subprocess.PIPE)
out = output.communicate()
if len(out) == 0:
return None
from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc):
+
+ builders = ['html', 'man']
+
def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {}
if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex()
- for builder in ['html', 'man']:
+ for builder in self.builders:
self.builder = builder
self.finalize_options()
self.project = self.distribution.get_name()
self.version = self.distribution.get_version()
self.release = self.distribution.get_version()
BuildDoc.run(self)
+
+ class LocalBuildLatex(LocalBuildDoc):
+ builders = ['latex']
+
cmdclass['build_sphinx'] = LocalBuildDoc
+ cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError:
pass
from eventlet import greenpool
from eventlet import greenthread
-from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
def _thread_done(gt, *args, **kwargs):
- '''
- Callback function to be passed to GreenThread.link() when we spawn()
- Calls the ThreadGroup to notify if.
- '''
+ """ Callback function to be passed to GreenThread.link() when we spawn()
+ Calls the :class:`ThreadGroup` to notify if.
+
+ """
kwargs['group'].thread_done(kwargs['thread'])
class Thread(object):
- """
- Wrapper around a greenthread, that holds a reference to
- the ThreadGroup. The Thread will notify the ThreadGroup
- when it has done so it can be removed from the threads
- list.
+ """ Wrapper around a greenthread, that holds a reference to the
+ :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
+ it has done so it can be removed from the threads list.
"""
def __init__(self, name, thread, group):
self.name = name
class ThreadGroup(object):
- """
- The point of this class is to:
- - keep track of timers and greenthreads (making it easier to stop them
+ """ The point of the ThreadGroup classis to:
+
+ * keep track of timers and greenthreads (making it easier to stop them
when need be).
- - provide an easy API to add timers.
+ * provide an easy API to add timers.
"""
def __init__(self, name, thread_pool_size=10):
self.name = name
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
+ if isinstance(before, basestring):
+ before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
+ if isinstance(after, basestring):
+ after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
- return utcnow.override_time
+ try:
+ return utcnow.override_time.pop(0)
+ except AttributeError:
+ return utcnow.override_time
return datetime.datetime.utcnow()
def set_time_override(override_time=datetime.datetime.utcnow()):
- """Override utils.utcnow to return a constant time."""
+ """
+ Override utils.utcnow to return a constant time or a list thereof,
+ one at a time.
+ """
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
- utcnow.override_time += timedelta
+ try:
+ for dt in utcnow.override_time:
+ dt += timedelta
+ except TypeError:
+ utcnow.override_time += timedelta
def advance_time_seconds(seconds):
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])
+
+
+def delta_seconds(before, after):
+ """
+ Compute the difference in seconds between two date, time, or
+ datetime objects (as a float, to microsecond resolution).
+ """
+ delta = after - before
+ try:
+ return delta.total_seconds()
+ except AttributeError:
+ return ((delta.days * 24 * 3600) + delta.seconds +
+ float(delta.microseconds) / (10 ** 6))
import setup
-class _deferred_version_string(object):
- """Internal helper class which provides delayed version calculation."""
- def __init__(self, version_info, prefix):
- self.version_info = version_info
- self.prefix = prefix
-
- def __str__(self):
- return "%s%s" % (self.prefix, self.version_info.version_string())
-
- def __repr__(self):
- return "%s%s" % (self.prefix, self.version_info.version_string())
-
-
class VersionInfo(object):
def __init__(self, package, python_package=None, pre_version=None):
self.python_package = python_package
self.pre_version = pre_version
self.version = None
+ self._cached_version = None
def _generate_version(self):
"""Defer to the openstack.common.setup routines for making a
version from git."""
if self.pre_version is None:
- return setup.get_post_version(self.python_package)
+ return setup.get_post_version(self.package)
else:
- return setup.get_pre_version(self.python_package, self.pre_version)
+ return setup.get_pre_version(self.package, self.pre_version)
def _newer_version(self, pending_version):
"""Check to see if we're working with a stale version or not.
else:
return '%s-dev' % (version_parts[0],)
- def deferred_version_string(self, prefix=""):
+ def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested
"""
- return _deferred_version_string(self, prefix)
+ if not self._cached_version:
+ self._cached_version = "%s%s" % (prefix,
+ self.version_string())
+ return self._cached_version
anyjson>=0.2.4
argparse
eventlet>=0.9.17
+extras
greenlet>=0.3.1
httplib2
iso8601>=0.1.4