log_opts = [
cfg.StrOpt('logging_context_format_string',
- default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
+ default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
- default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
+ default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
- default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
+ default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if ('quantum.openstack.common.notifier.log_notifier' in
- CONF.notification_driver):
+ CONF.notification_driver):
return
notifier.api.notify(None, 'error.publisher',
'error_notification',
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
- if CONF.verbose or CONF.debug:
+ if CONF.debug:
log_root.setLevel(logging.DEBUG)
- else:
+ elif CONF.verbose:
log_root.setLevel(logging.INFO)
+ else:
+ log_root.setLevel(logging.WARNING)
level = logging.NOTSET
for pair in CONF.default_log_levels:
self._fmt = CONF.logging_default_format_string
if (record.levelno == logging.DEBUG and
- CONF.logging_debug_format_suffix):
+ CONF.logging_debug_format_suffix):
self._fmt += " " + CONF.logging_debug_format_suffix
# Cache this on the record, Logger will respect our formated copy
for reduction, methname in self.reducers:
if (len(self.tokens) >= len(reduction) and
- self.tokens[-len(reduction):] == reduction):
- # Get the reduction method
- meth = getattr(self, methname)
+ self.tokens[-len(reduction):] == reduction):
+ # Get the reduction method
+ meth = getattr(self, methname)
- # Reduce the token stream
- results = meth(*self.values[-len(reduction):])
+ # Reduce the token stream
+ results = meth(*self.values[-len(reduction):])
- # Update the tokens and values
- self.tokens[-len(reduction):] = [r[0] for r in results]
- self.values[-len(reduction):] = [r[1] for r in results]
+ # Update the tokens and values
+ self.tokens[-len(reduction):] = [r[0] for r in results]
+ self.values[-len(reduction):] = [r[1] for r in results]
- # Check for any more reductions
- return self.reduce()
+ # Check for any more reductions
+ return self.reduce()
def shift(self, tok, value):
"""Adds one more token to the state. Calls reduce()."""
# under the License.
import pprint
+import os
import socket
import string
import sys
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
+from quantum.openstack.common import processutils as utils
from quantum.openstack.common.rpc import common as rpc_common
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
+ cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- self.topic_proxy['zmq_replies'] = \
- ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
- zmq.PUB, bind=True)
- self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
+ def publisher(waiter):
+ LOG.info(_("Creating proxy for topic: %s"), topic)
+
+ try:
+ out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+ (ipc_dir, topic),
+ sock_type, bind=True)
+ except RPCException:
+ waiter.send_exception(*sys.exc_info())
+ return
+
+ self.topic_proxy[topic] = eventlet.queue.LightQueue(
+ CONF.rpc_zmq_topic_backlog)
+ self.sockets.append(out_sock)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ waiter.send(True)
+
+ while(True):
+ data = self.topic_proxy[topic].get()
+ out_sock.send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
+ {'data': data})
+
+ wait_sock_creation = eventlet.event.Event()
+ eventlet.spawn(publisher, wait_sock_creation)
+
+ try:
+ wait_sock_creation.wait()
+ except RPCException:
+ LOG.error(_("Topic socket file creation failed."))
+ return
+
+ try:
+ self.topic_proxy[topic].put_nowait(data)
+ LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
+ {'data': data})
+ except eventlet.queue.Full:
+ LOG.error(_("Local per-topic backlog buffer full for topic "
+ "%(topic)s. Dropping message.") % {'topic': topic})
+
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
+ raise
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+ raise
- LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
- self.topic_proxy[topic].send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+ super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor):
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None):
+def _call(addr, context, msg_id, topic, msg, timeout=None,
+ serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload)
+ _cast(addr, context, msg_id, topic, payload,
+ serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
_topic, _topic, msg, timeout, serialize,
force_envelope)
return
- return method(_addr, context, _topic, _topic, msg, timeout)
+ return method(_addr, context, _topic, _topic, msg, timeout,
+ serialize, force_envelope)
def create_connection(conf, new=True):