AMQP, but is deprecated and predates this code.
"""
+import collections
import inspect
import sys
import uuid
cfg.CONF.register_opts(amqp_opts)
+UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
'failure': failure}
if ending:
msg['ending'] = True
+ _add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
msg.update(context_d)
+class _MsgIdCache(object):
+ """This class checks any duplicate messages."""
+
+ # NOTE: This value is considered can be a configuration item, but
+ # it is not necessary to change its value in most cases,
+ # so let this value as static for now.
+ DUP_MSG_CHECK_SIZE = 16
+
+ def __init__(self, **kwargs):
+ self.prev_msgids = collections.deque([],
+ maxlen=self.DUP_MSG_CHECK_SIZE)
+
+ def check_duplicate_message(self, message_data):
+ """AMQP consumers may read same message twice when exceptions occur
+ before ack is returned. This method prevents doing it.
+ """
+ if UNIQUE_ID in message_data:
+ msg_id = message_data[UNIQUE_ID]
+ if msg_id not in self.prev_msgids:
+ self.prev_msgids.append(msg_id)
+ else:
+ raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+
+
+def _add_unique_id(msg):
+ """Add unique_id for checking duplicate messages."""
+ unique_id = uuid.uuid4().hex
+ msg.update({UNIQUE_ID: unique_id})
+ LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+
+
class _ThreadPoolWithWait(object):
"""Base class for a delayed invocation manager used by
the Connection class to start up green threads
connection_pool=connection_pool,
)
self.proxy = proxy
+ self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+ self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
self._dataqueue = queue.LightQueue()
# Add this caller to the reply proxy's call_waiters
self._reply_proxy.add_call_waiter(self, self._msg_id)
+ self.msg_id_cache = _MsgIdCache()
def put(self, data):
self._dataqueue.put(data)
def _process_data(self, data):
result = None
+ self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
result = rpc_common.deserialize_remote_exception(self._conf,
self._done = False
self._got_ending = False
self._conf = conf
+ self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
+ self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
+ _add_unique_id(msg)
pack_context(msg, context)
# TODO(pekowski): Remove this flag and the code under the if clause
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
- if serialize:
- data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
+ if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'cast', _serialize(data))))
+ return
+
+ rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
+ zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
else:
return [result]
- def process(self, style, target, proxy, ctx, data):
+ def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
- topic = topic.split('.', 1)[0]
+ topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ topic = topic.split('.', 1)[0]
+ elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
super(ZmqProxy, self).consume_in_thread()
+def unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
+
+
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ proxy = self.proxies[sock]
- ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
- ctx = RpcContext.unmarshal(ctx)
+ if data[2] == 'cast': # Legacy protocol
+ packenv = data[3]
- proxy = self.proxies[sock]
+ ctx, msg = _deserialize(packenv)
+ request = rpc_common.deserialize_msg(msg)
+ ctx = RpcContext.unmarshal(ctx)
+ elif data[2] == 'impl_zmq_v2':
+ packenv = data[4:]
+
+ msg = unflatten_envelope(packenv)
+ request = rpc_common.deserialize_msg(msg)
+
+ # Unmarshal only after verifying the message.
+ ctx = RpcContext.unmarshal(data[3])
+ else:
+ LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+ return
- self.pool.spawn_n(self.process, style, topic,
- proxy, ctx, request)
+ self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
+ self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
-
# Subscription scenarios
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
- topic = 'fanout~' + topic
+ subscribe = ('', fanout)[type(fanout) == str]
+ topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+ if topic in self.topics:
+ LOG.info(_("Skipping topic registration. Already registered."))
+ return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
+ self.topics.append(topic)
def close(self):
self.reactor.close()
+ self.topics = []
def wait(self):
self.reactor.wait()
self.reactor.consume_in_thread()
-def _cast(addr, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
+ _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(_msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
def _call(addr, context, topic, msg, timeout=None,
- serialize=True, force_envelope=False):
+ envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies.%s" %
+ (CONF.rpc_zmq_ipc_dir,
+ CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, topic, payload,
- serialize=serialize, force_envelope=force_envelope)
+ _cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])[-1]['args']['response']
+
+ if msg[2] == 'cast': # Legacy version
+ raw_msg = _deserialize(msg[-1])[-1]
+ elif msg[2] == 'impl_zmq_v2':
+ rpc_envelope = unflatten_envelope(msg[4:])
+ raw_msg = rpc_common.deserialize_msg(rpc_envelope)
+ else:
+ raise rpc_common.UnsupportedRpcEnvelopeVersion(
+ _("Unsupported or unknown ZMQ envelope returned."))
+
+ responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _multi_send(method, context, topic, msg, timeout=None,
+ envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, msg, timeout, serialize,
- force_envelope, _msg_id)
+ _topic, msg, timeout, envelope,
+ _msg_id)
return
return method(_addr, context, _topic, msg, timeout,
- serialize, force_envelope)
+ envelope)
def create_connection(conf, new=True):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
- kwargs['serialize'] = kwargs.pop('envelope')
- kwargs['force_envelope'] = True
+ kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)