from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
-from cinder.openstack.common import processutils as utils
from cinder.openstack.common.rpc import common as rpc_common
zmq = importutils.try_import('eventlet.green.zmq')
def _serialize(data):
- """
- Serialization wrapper
+ """Serialization wrapper.
+
We prefer using JSON, but it cannot encode all types.
Error if a developer passes us bad data.
"""
def _deserialize(data):
- """
- Deserialization wrapper
- """
+ """Deserialization wrapper."""
LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data)
class ZmqSocket(object):
- """
- A tiny wrapper around ZeroMQ to simplify the send/recv protocol
- and connection management.
+ """A tiny wrapper around ZeroMQ.
+ Simplifies the send/recv protocol and connection management.
Can be used as a Context (supports the 'with' statement).
"""
return
# We must unsubscribe, or we'll leak descriptors.
- if len(self.subscriptions) > 0:
+ if self.subscriptions:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
- def recv(self):
+ def recv(self, **kwargs):
if not self.can_recv:
raise RPCException(_("You cannot recv on this socket."))
- return self.sock.recv_multipart()
+ return self.sock.recv_multipart(**kwargs)
- def send(self, data):
+ def send(self, data, **kwargs):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
- self.sock.send_multipart(data)
+ self.sock.send_multipart(data, **kwargs)
class ZmqClient(object):
"""Client for ZMQ sockets."""
- def __init__(self, addr, socket_type=None, bind=False):
- if socket_type is None:
- socket_type = zmq.PUSH
- self.outq = ZmqSocket(addr, socket_type, bind=bind)
+ def __init__(self, addr):
+ self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
- def cast(self, msg_id, topic, data, envelope=False):
+ def cast(self, msg_id, topic, data, envelope):
msg_id = msg_id or 0
if not envelope:
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
- except rpc_common.ClientException, e:
+ except rpc_common.ClientException as e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
class ZmqBaseReactor(ConsumerBase):
- """
- A consumer class implementing a
- centralized casting broker (PULL-PUSH)
- for RoundRobin requests.
+ """A consumer class implementing a centralized casting broker (PULL-PUSH).
+
+ Used for RoundRobin requests.
"""
def __init__(self, conf):
super(ZmqBaseReactor, self).__init__()
- self.mapping = {}
self.proxies = {}
self.threads = []
self.sockets = []
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
- def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
- zmq_type_out=None, in_bind=True, out_bind=True,
- subscribe=None):
+ def register(self, proxy, in_addr, zmq_type_in,
+ in_bind=True, subscribe=None):
LOG.info(_("Registering reactor"))
LOG.info(_("In reactor registered"))
- if not out_addr:
- return
-
- if zmq_type_out not in (zmq.PUSH, zmq.PUB):
- raise RPCException("Bad output socktype")
-
- # Items push out.
- outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
-
- self.mapping[inq] = outq
- self.mapping[outq] = inq
- self.sockets.append(outq)
-
- LOG.info(_("Out reactor registered"))
-
def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
class ZmqProxy(ZmqBaseReactor):
- """
- A consumer class implementing a
- topic-based proxy, forwarding to
- IPC sockets.
+ """A consumer class implementing a topic-based proxy.
+
+ Forwards to IPC sockets.
"""
def __init__(self, conf):
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
- #TODO(ewindisch): use zero-copy (i.e. references, not copying)
- data = sock.recv()
- topic = data[1]
-
- LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+ data = sock.recv(copy=False)
+ topic = data[1].bytes
if topic.startswith('fanout~'):
sock_type = zmq.PUB
while(True):
data = self.topic_proxy[topic].get()
- out_sock.send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
- {'data': data})
+ out_sock.send(data, copy=False)
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
try:
self.topic_proxy[topic].put_nowait(data)
- LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
- {'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
- """Runs the ZmqProxy service"""
+ """Runs the ZmqProxy service."""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)
- if not os.path.isdir(ipc_dir):
- try:
- utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
- utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
- ipc_dir, run_as_root=True)
- utils.execute('chmod', '750', ipc_dir, run_as_root=True)
- except utils.ProcessExecutionError:
+ try:
+ os.makedirs(ipc_dir)
+ except os.error:
+ if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
- LOG.error(_("Could not create IPC directory %s") %
- (ipc_dir, ))
-
+ LOG.error(_("Required IPC directory does not exist at"
+ " %s") % (ipc_dir, ))
try:
self.register(consumption_proxy,
consume_in,
- zmq.PULL,
- out_bind=True)
+ zmq.PULL)
except zmq.ZMQError:
+ if os.access(ipc_dir, os.X_OK):
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Permission denied to IPC directory at"
+ " %s") % (ipc_dir, ))
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
- Takes a list and returns a dictionary.
- i.e. [1,2,3,4] => {1: 2, 3: 4}
+
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
class ZmqReactor(ZmqBaseReactor):
- """
- A consumer class implementing a
- consumer for messages. Can also be
- used as a 1:1 proxy
+ """A consumer class implementing a consumer for messages.
+
+ Can also be used as a 1:1 proxy
"""
def __init__(self, conf):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
- if sock in self.mapping:
- LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
- 'data': data})
- self.mapping[sock].send(data)
- return
proxy = self.proxies[sock]
else:
sock_type = zmq.PULL
subscribe = None
- topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+ topic = '.'.join((topic, CONF.rpc_zmq_host))
if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
- """
- Wraps the sending of messages,
- dispatches to the matchmaker and sends
- message to all relevant hosts.
+ """Wraps the sending of messages.
+
+ Dispatches to the matchmaker and sends message to all relevant hosts.
"""
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
- if len(queues) == 0:
+ if not queues:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
"""Send a message to all listening and expect no reply."""
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
- _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+ LOG.error(_('topic is %s.') % topic)
+ if topic:
+ _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
def notify(conf, context, topic, msg, envelope):
- """
- Send notification event.
+ """Send notification event.
+
Notifications are sent to topic-priority.
This differs from the AMQP drivers which send to topic.priority.
"""
def _get_matchmaker(*args, **kwargs):
global matchmaker
if not matchmaker:
- matchmaker = importutils.import_object(
- CONF.rpc_zmq_matchmaker, *args, **kwargs)
+ mm = CONF.rpc_zmq_matchmaker
+ if mm.endswith('matchmaker.MatchMakerRing'):
+ mm.replace('matchmaker', 'matchmaker_ring')
+ LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
+ ' %(new)s instead') % dict(
+ orig=CONF.rpc_zmq_matchmaker, new=mm))
+ matchmaker = importutils.import_object(mm, *args, **kwargs)
return matchmaker