from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
+from eventlet import queue
+# TODO(pekowsk): Remove import cfg and below comment in Havana.
+# This import should no longer be needed when the amqp_rpc_single_reply_queue
+# option is removed.
+from oslo.config import cfg
from quantum.openstack.common import excutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import common as rpc_common
+# TODO(pekowski): Remove this option in Havana.
+amqp_opts = [
+ cfg.BoolOpt('amqp_rpc_single_reply_queue',
+ default=False,
+ help='Enable a fast single reply queue if using AMQP based '
+ 'RPC like RabbitMQ or Qpid.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
LOG = logging.getLogger(__name__)
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
+ self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self):
def empty(self):
while self.free_items:
self.get().close()
+ # Force a new connection pool to be created.
+ # Note that this was added due to failing unit test cases. The issue
+ # is the above "while loop" gets all the cached connections from the
+ # pool and closes them, but never returns them to the pool, a pool
+ # leak. The unit tests hang waiting for an item to be returned to the
+ # pool. The unit tests get here via the teatDown() method. In the run
+ # time code, it gets here via cleanup() and only appears in service.py
+ # just before doing a sys.exit(), so cleanup() only happens once and
+ # the leakage is not a problem.
+ self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
+ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+ self.connection.join_consumer_pool(callback,
+ pool_name,
+ topic,
+ exchange_name)
+
def consume_in_thread(self):
self.connection.consume_in_thread()
raise rpc_common.InvalidRPCConnectionReuse()
-def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
- ending=False, log_failure=True):
+class ReplyProxy(ConnectionContext):
+ """ Connection class for RPC replies / callbacks """
+ def __init__(self, conf, connection_pool):
+ self._call_waiters = {}
+ self._num_call_waiters = 0
+ self._num_call_waiters_wrn_threshhold = 10
+ self._reply_q = 'reply_' + uuid.uuid4().hex
+ super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
+ self.declare_direct_consumer(self._reply_q, self._process_data)
+ self.consume_in_thread()
+
+ def _process_data(self, message_data):
+ msg_id = message_data.pop('_msg_id', None)
+ waiter = self._call_waiters.get(msg_id)
+ if not waiter:
+ LOG.warn(_('no calling threads waiting for msg_id : %s'
+ ', message : %s') % (msg_id, message_data))
+ else:
+ waiter.put(message_data)
+
+ def add_call_waiter(self, waiter, msg_id):
+ self._num_call_waiters += 1
+ if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+ LOG.warn(_('Number of call waiters is greater than warning '
+ 'threshhold: %d. There could be a MulticallProxyWaiter '
+ 'leak.') % self._num_call_waiters_wrn_threshhold)
+ self._num_call_waiters_wrn_threshhold *= 2
+ self._call_waiters[msg_id] = waiter
+
+ def del_call_waiter(self, msg_id):
+ self._num_call_waiters -= 1
+ del self._call_waiters[msg_id]
+
+ def get_reply_q(self):
+ return self._reply_q
+
+
+def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
+ failure=None, ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+ # If a reply_q exists, add the msg_id to the reply and pass the
+ # reply_q to direct_send() to use it as the response queue.
+ # Otherwise use the msg_id for backward compatibilty.
+ if reply_q:
+ msg['_msg_id'] = msg_id
+ conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
+ else:
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
+ self.reply_q = kwargs.pop('reply_q', None)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
+ values['reply_q'] = self.reply_q
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None, log_failure=True):
if self.msg_id:
- msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
- ending, log_failure)
+ msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
+ reply, failure, ending, log_failure)
if ending:
self.msg_id = None
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
msg.update(context_d)
-class ProxyCallback(object):
- """Calls methods on a proxy object based on method and args."""
+class _ThreadPoolWithWait(object):
+ """Base class for a delayed invocation manager used by
+ the Connection class to start up green threads
+ to handle incoming messages.
+ """
- def __init__(self, conf, proxy, connection_pool):
- self.proxy = proxy
+ def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
+
+class CallbackWrapper(_ThreadPoolWithWait):
+ """Wraps a straight callback to allow it to be invoked in a green
+ thread.
+ """
+
+ def __init__(self, conf, callback, connection_pool):
+ """
+ :param conf: cfg.CONF instance
+ :param callback: a callable (probably a function)
+ :param connection_pool: connection pool as returned by
+ get_connection_pool()
+ """
+ super(CallbackWrapper, self).__init__(
+ conf=conf,
+ connection_pool=connection_pool,
+ )
+ self.callback = callback
+
+ def __call__(self, message_data):
+ self.pool.spawn_n(self.callback, message_data)
+
+
+class ProxyCallback(_ThreadPoolWithWait):
+ """Calls methods on a proxy object based on method and args."""
+
+ def __init__(self, conf, proxy, connection_pool):
+ super(ProxyCallback, self).__init__(
+ conf=conf,
+ connection_pool=connection_pool,
+ )
+ self.proxy = proxy
+
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
- def wait(self):
- """Wait for all callback threads to exit."""
- self.pool.waitall()
+
+class MulticallProxyWaiter(object):
+ def __init__(self, conf, msg_id, timeout, connection_pool):
+ self._msg_id = msg_id
+ self._timeout = timeout or conf.rpc_response_timeout
+ self._reply_proxy = connection_pool.reply_proxy
+ self._done = False
+ self._got_ending = False
+ self._conf = conf
+ self._dataqueue = queue.LightQueue()
+ # Add this caller to the reply proxy's call_waiters
+ self._reply_proxy.add_call_waiter(self, self._msg_id)
+
+ def put(self, data):
+ self._dataqueue.put(data)
+
+ def done(self):
+ if self._done:
+ return
+ self._done = True
+ # Remove this caller from reply proxy's call_waiters
+ self._reply_proxy.del_call_waiter(self._msg_id)
+
+ def _process_data(self, data):
+ result = None
+ if data['failure']:
+ failure = data['failure']
+ result = rpc_common.deserialize_remote_exception(self._conf,
+ failure)
+ elif data.get('ending', False):
+ self._got_ending = True
+ else:
+ result = data['result']
+ return result
+
+ def __iter__(self):
+ """Return a result until we get a reply with an 'ending" flag"""
+ if self._done:
+ raise StopIteration
+ while True:
+ try:
+ data = self._dataqueue.get(timeout=self._timeout)
+ result = self._process_data(data)
+ except queue.Empty:
+ LOG.exception(_('Timed out waiting for RPC response.'))
+ self.done()
+ raise rpc_common.Timeout()
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.done()
+ if self._got_ending:
+ self.done()
+ raise StopIteration
+ if isinstance(result, Exception):
+ self.done()
+ raise result
+ yield result
+#TODO(pekowski): Remove MulticallWaiter() in Havana.
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
return ConnectionContext(conf, connection_pool, pooled=not new)
+_reply_proxy_create_sem = semaphore.Semaphore()
+
+
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
+ # TODO(pekowski): Remove all these comments in Havana.
+ # For amqp_rpc_single_reply_queue = False,
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
+ # For amqp_rpc_single_reply_queue = True,
+ # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context)
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ # TODO(pekowski): Remove this flag and the code under the if clause
+ # in Havana.
+ if not conf.amqp_rpc_single_reply_queue:
+ conn = ConnectionContext(conf, connection_pool)
+ wait_msg = MulticallWaiter(conf, conn, timeout)
+ conn.declare_direct_consumer(msg_id, wait_msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ else:
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg