local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
- cleanup_dir = True
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
"""
+import ConfigParser
import cStringIO
import inspect
import itertools
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
- 'If not set, logging will go to stdout.'),
+ 'If no default is set, logging will go to stdout.'),
cfg.StrOpt('log-dir',
deprecated_name='logdir',
- help='(Optional) The directory to keep log files in '
- '(will be prepended to --log-file)'),
+ help='(Optional) The base directory used for relative '
+ '--log-file paths'),
cfg.BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
return logging_excepthook
+class LogConfigError(Exception):
+
+ message = _('Error loading logging config %(log_config)s: %(err_msg)s')
+
+ def __init__(self, log_config, err_msg):
+ self.log_config = log_config
+ self.err_msg = err_msg
+
+ def __str__(self):
+ return self.message % dict(log_config=self.log_config,
+ err_msg=self.err_msg)
+
+
+def _load_log_config(log_config):
+ try:
+ logging.config.fileConfig(log_config)
+ except ConfigParser.Error, exc:
+ raise LogConfigError(log_config, str(exc))
+
+
def setup(product_name):
"""Setup logging."""
if CONF.log_config:
- logging.config.fileConfig(CONF.log_config)
+ _load_log_config(CONF.log_config)
else:
_setup_logging_from_conf()
sys.excepthook = _create_logging_excepthook(product_name)
data = self._dataqueue.get(timeout=self._timeout)
result = self._process_data(data)
except queue.Empty:
- LOG.exception(_('Timed out waiting for RPC response.'))
self.done()
raise rpc_common.Timeout()
except Exception:
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
- msg = rpc_common.serialize_msg(msg, force_envelope=True)
+ msg = rpc_common.serialize_msg(msg)
conn.notify_send(topic, msg)
_MESSAGE_KEY = 'oslo.message'
-# TODO(russellb) Turn this on after Grizzly.
-_SEND_RPC_ENVELOPE = False
-
-
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
This exception is raised if the rpc_response_timeout is reached while
waiting for a response from the remote side.
"""
- message = _("Timeout while waiting on RPC response.")
+ message = _('Timeout while waiting on RPC response - '
+ 'topic: "%(topic)s", RPC method: "%(method)s" '
+ 'info: "%(info)s"')
+
+ def __init__(self, info=None, topic=None, method=None):
+ """
+ :param info: Extra info to convey to the user
+ :param topic: The topic that the rpc call was sent to
+ :param rpc_method_name: The name of the rpc method being
+ called
+ """
+ self.info = info
+ self.topic = topic
+ self.method = method
+ super(Timeout, self).__init__(
+ None,
+ info=info or _('<unknown>'),
+ topic=topic or _('<unknown>'),
+ method=method or _('<unknown>'))
class DuplicateMessageError(RPCException):
return True
-def serialize_msg(raw_msg, force_envelope=False):
- if not _SEND_RPC_ENVELOPE and not force_envelope:
- return raw_msg
-
+def serialize_msg(raw_msg):
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
- if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ if not envelope:
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return
def reply(self, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
- # Our real method is curried into msg['args']
+ # NOTE(ewindisch): context kwarg exists for Grizzly compat.
+ # this may be able to be removed earlier than
+ # 'I' if ConsumerBase.process were refactored.
+ if type(msg) is list:
+ payload = msg[-1]
+ else:
+ payload = msg
- child_ctx = RpcContext.unmarshal(msg[0])
response = ConsumerBase.normalize_reply(
- self._get_response(child_ctx, proxy, topic, msg[1]),
+ self._get_response(ctx, proxy, topic, payload),
ctx.replies)
LOG.debug(_("Sending reply"))
'method': '-reply',
'args': {
'msg_id': msg_id,
- 'context': mcontext,
'topic': reply_topic,
+ # TODO(ewindisch): safe to remove mcontext in I.
'msg': [mcontext, msg]
}
}
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
- return rpc.call(context, self._get_topic(topic), msg, timeout)
+ real_topic = self._get_topic(topic)
+ try:
+ return rpc.call(context, real_topic, msg, timeout)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def multicall(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.multicall() a remote method.
:param context: The request context
:param msg: The message to send, including the method and args.
:param topic: Override the topic for this message.
+ :param version: (Optional) Override the requested API version in this
+ message.
:param timeout: (Optional) A timeout to use when waiting for the
response. If no timeout is specified, a default timeout will be
used that is usually sufficient.
- :param version: (Optional) Override the requested API version in this
- message.
:returns: An iterator that lets you process each of the returned values
from the remote method as they arrive.
"""
self._set_version(msg, version)
- return rpc.multicall(context, self._get_topic(topic), msg, timeout)
+ real_topic = self._get_topic(topic)
+ try:
+ return rpc.multicall(context, real_topic, msg, timeout)
+ except rpc.common.Timeout as exc:
+ raise rpc.common.Timeout(
+ exc.info, real_topic, msg.get('method'))
def cast(self, context, msg, topic=None, version=None):
"""rpc.cast() a remote method.
" log --format='%aN <%aE>' | sort -u | "
"egrep -v '" + jenkins_email + "'")
changelog = _run_shell_command(git_log_cmd)
+ signed_cmd = ("git log --git-dir=" + git_dir +
+ " | grep -i Co-authored-by: | sort -u")
+ signed_entries = _run_shell_command(signed_cmd)
+ if signed_entries:
+ new_entries = "\n".join(
+ [signed.split(":", 1)[1].strip()
+ for signed in signed_entries.split("\n") if signed])
+ changelog = "\n".join((changelog, new_entries))
mailmap = _parse_git_mailmap(git_dir)
with open(new_authors, 'w') as new_authors_fh:
new_authors_fh.write(canonicalize_emails(changelog, mailmap))