]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Update latest OSLO files
authorGary Kotton <gkotton@redhat.com>
Tue, 19 Feb 2013 13:48:03 +0000 (13:48 +0000)
committerGary Kotton <gkotton@redhat.com>
Tue, 19 Feb 2013 14:36:53 +0000 (14:36 +0000)
Change-Id: I17844d2bc5f8255833e8a5b3dc9412041de12826

quantum/openstack/common/notifier/log_notifier.py
quantum/openstack/common/notifier/rpc_notifier.py
quantum/openstack/common/rpc/amqp.py
quantum/openstack/common/rpc/common.py
quantum/openstack/common/rpc/impl_kombu.py
quantum/openstack/common/rpc/impl_qpid.py

index bc8f61865edd252d610b68f9e49979933fdc700a..cea98311c490460abf103acb6ff83ef4650e1980 100644 (file)
@@ -13,7 +13,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-
 from oslo.config import cfg
 
 from quantum.openstack.common import jsonutils
index 8d84aa69c0ca5d96c61729478d8b695c71c4652b..e7caa97e28d2ed5adeec768849300829dbb336ec 100644 (file)
@@ -13,7 +13,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-
 from oslo.config import cfg
 
 from quantum.openstack.common import context as req_context
index 66ffd528f78e1e8ab34a01cad97367f5037e555b..4b7b99dc2e23c3cd7c6d390cbdfde68d061e1e51 100644 (file)
@@ -32,13 +32,27 @@ import uuid
 from eventlet import greenpool
 from eventlet import pools
 from eventlet import semaphore
+from eventlet import queue
 
+# TODO(pekowsk): Remove import cfg and below comment in Havana.
+# This import should no longer be needed when the amqp_rpc_single_reply_queue
+# option is removed.
+from oslo.config import cfg
 from quantum.openstack.common import excutils
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import local
 from quantum.openstack.common import log as logging
 from quantum.openstack.common.rpc import common as rpc_common
 
+# TODO(pekowski): Remove this option in Havana.
+amqp_opts = [
+    cfg.BoolOpt('amqp_rpc_single_reply_queue',
+                default=False,
+                help='Enable a fast single reply queue if using AMQP based '
+                'RPC like RabbitMQ or Qpid.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
 
 LOG = logging.getLogger(__name__)
 
@@ -51,6 +65,7 @@ class Pool(pools.Pool):
         kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
         kwargs.setdefault("order_as_stack", True)
         super(Pool, self).__init__(*args, **kwargs)
+        self.reply_proxy = None
 
     # TODO(comstud): Timeout connections not used in a while
     def create(self):
@@ -60,6 +75,16 @@ class Pool(pools.Pool):
     def empty(self):
         while self.free_items:
             self.get().close()
+        # Force a new connection pool to be created.
+        # Note that this was added due to failing unit test cases. The issue
+        # is the above "while loop" gets all the cached connections from the
+        # pool and closes them, but never returns them to the pool, a pool
+        # leak. The unit tests hang waiting for an item to be returned to the
+        # pool. The unit tests get here via the teatDown() method. In the run
+        # time code, it gets here via cleanup() and only appears in service.py
+        # just before doing a sys.exit(), so cleanup() only happens once and
+        # the leakage is not a problem.
+        self.connection_cls.pool = None
 
 
 _pool_create_sem = semaphore.Semaphore()
@@ -137,6 +162,12 @@ class ConnectionContext(rpc_common.Connection):
     def create_worker(self, topic, proxy, pool_name):
         self.connection.create_worker(topic, proxy, pool_name)
 
+    def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+        self.connection.join_consumer_pool(callback,
+                                           pool_name,
+                                           topic,
+                                           exchange_name)
+
     def consume_in_thread(self):
         self.connection.consume_in_thread()
 
@@ -148,8 +179,45 @@ class ConnectionContext(rpc_common.Connection):
             raise rpc_common.InvalidRPCConnectionReuse()
 
 
-def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
-              ending=False, log_failure=True):
+class ReplyProxy(ConnectionContext):
+    """ Connection class for RPC replies / callbacks """
+    def __init__(self, conf, connection_pool):
+        self._call_waiters = {}
+        self._num_call_waiters = 0
+        self._num_call_waiters_wrn_threshhold = 10
+        self._reply_q = 'reply_' + uuid.uuid4().hex
+        super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
+        self.declare_direct_consumer(self._reply_q, self._process_data)
+        self.consume_in_thread()
+
+    def _process_data(self, message_data):
+        msg_id = message_data.pop('_msg_id', None)
+        waiter = self._call_waiters.get(msg_id)
+        if not waiter:
+            LOG.warn(_('no calling threads waiting for msg_id : %s'
+                       ', message : %s') % (msg_id, message_data))
+        else:
+            waiter.put(message_data)
+
+    def add_call_waiter(self, waiter, msg_id):
+        self._num_call_waiters += 1
+        if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+            LOG.warn(_('Number of call waiters is greater than warning '
+                       'threshhold: %d. There could be a MulticallProxyWaiter '
+                       'leak.') % self._num_call_waiters_wrn_threshhold)
+            self._num_call_waiters_wrn_threshhold *= 2
+        self._call_waiters[msg_id] = waiter
+
+    def del_call_waiter(self, msg_id):
+        self._num_call_waiters -= 1
+        del self._call_waiters[msg_id]
+
+    def get_reply_q(self):
+        return self._reply_q
+
+
+def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
+              failure=None, ending=False, log_failure=True):
     """Sends a reply or an error on the channel signified by msg_id.
 
     Failure should be a sys.exc_info() tuple.
@@ -168,13 +236,21 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
                    'failure': failure}
         if ending:
             msg['ending'] = True
-        conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+        # If a reply_q exists, add the msg_id to the reply and pass the
+        # reply_q to direct_send() to use it as the response queue.
+        # Otherwise use the msg_id for backward compatibilty.
+        if reply_q:
+            msg['_msg_id'] = msg_id
+            conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
+        else:
+            conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
 
 
 class RpcContext(rpc_common.CommonRpcContext):
     """Context that supports replying to a rpc.call"""
     def __init__(self, **kwargs):
         self.msg_id = kwargs.pop('msg_id', None)
+        self.reply_q = kwargs.pop('reply_q', None)
         self.conf = kwargs.pop('conf')
         super(RpcContext, self).__init__(**kwargs)
 
@@ -182,13 +258,14 @@ class RpcContext(rpc_common.CommonRpcContext):
         values = self.to_dict()
         values['conf'] = self.conf
         values['msg_id'] = self.msg_id
+        values['reply_q'] = self.reply_q
         return self.__class__(**values)
 
     def reply(self, reply=None, failure=None, ending=False,
               connection_pool=None, log_failure=True):
         if self.msg_id:
-            msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
-                      ending, log_failure)
+            msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
+                      reply, failure, ending, log_failure)
             if ending:
                 self.msg_id = None
 
@@ -204,6 +281,7 @@ def unpack_context(conf, msg):
             value = msg.pop(key)
             context_dict[key[9:]] = value
     context_dict['msg_id'] = msg.pop('_msg_id', None)
+    context_dict['reply_q'] = msg.pop('_reply_q', None)
     context_dict['conf'] = conf
     ctx = RpcContext.from_dict(context_dict)
     rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
@@ -224,15 +302,54 @@ def pack_context(msg, context):
     msg.update(context_d)
 
 
-class ProxyCallback(object):
-    """Calls methods on a proxy object based on method and args."""
+class _ThreadPoolWithWait(object):
+    """Base class for a delayed invocation manager used by
+    the Connection class to start up green threads
+    to handle incoming messages.
+    """
 
-    def __init__(self, conf, proxy, connection_pool):
-        self.proxy = proxy
+    def __init__(self, conf, connection_pool):
         self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
         self.connection_pool = connection_pool
         self.conf = conf
 
+    def wait(self):
+        """Wait for all callback threads to exit."""
+        self.pool.waitall()
+
+
+class CallbackWrapper(_ThreadPoolWithWait):
+    """Wraps a straight callback to allow it to be invoked in a green
+    thread.
+    """
+
+    def __init__(self, conf, callback, connection_pool):
+        """
+        :param conf: cfg.CONF instance
+        :param callback: a callable (probably a function)
+        :param connection_pool: connection pool as returned by
+                                get_connection_pool()
+        """
+        super(CallbackWrapper, self).__init__(
+            conf=conf,
+            connection_pool=connection_pool,
+        )
+        self.callback = callback
+
+    def __call__(self, message_data):
+        self.pool.spawn_n(self.callback, message_data)
+
+
+class ProxyCallback(_ThreadPoolWithWait):
+    """Calls methods on a proxy object based on method and args."""
+
+    def __init__(self, conf, proxy, connection_pool):
+        super(ProxyCallback, self).__init__(
+            conf=conf,
+            connection_pool=connection_pool,
+        )
+        self.proxy = proxy
+
     def __call__(self, message_data):
         """Consumer callback to call a method on a proxy object.
 
@@ -293,11 +410,66 @@ class ProxyCallback(object):
             ctxt.reply(None, sys.exc_info(),
                        connection_pool=self.connection_pool)
 
-    def wait(self):
-        """Wait for all callback threads to exit."""
-        self.pool.waitall()
+
+class MulticallProxyWaiter(object):
+    def __init__(self, conf, msg_id, timeout, connection_pool):
+        self._msg_id = msg_id
+        self._timeout = timeout or conf.rpc_response_timeout
+        self._reply_proxy = connection_pool.reply_proxy
+        self._done = False
+        self._got_ending = False
+        self._conf = conf
+        self._dataqueue = queue.LightQueue()
+        # Add this caller to the reply proxy's call_waiters
+        self._reply_proxy.add_call_waiter(self, self._msg_id)
+
+    def put(self, data):
+        self._dataqueue.put(data)
+
+    def done(self):
+        if self._done:
+            return
+        self._done = True
+        # Remove this caller from reply proxy's call_waiters
+        self._reply_proxy.del_call_waiter(self._msg_id)
+
+    def _process_data(self, data):
+        result = None
+        if data['failure']:
+            failure = data['failure']
+            result = rpc_common.deserialize_remote_exception(self._conf,
+                                                             failure)
+        elif data.get('ending', False):
+            self._got_ending = True
+        else:
+            result = data['result']
+        return result
+
+    def __iter__(self):
+        """Return a result until we get a reply with an 'ending" flag"""
+        if self._done:
+            raise StopIteration
+        while True:
+            try:
+                data = self._dataqueue.get(timeout=self._timeout)
+                result = self._process_data(data)
+            except queue.Empty:
+                LOG.exception(_('Timed out waiting for RPC response.'))
+                self.done()
+                raise rpc_common.Timeout()
+            except Exception:
+                with excutils.save_and_reraise_exception():
+                    self.done()
+            if self._got_ending:
+                self.done()
+                raise StopIteration
+            if isinstance(result, Exception):
+                self.done()
+                raise result
+            yield result
 
 
+#TODO(pekowski): Remove MulticallWaiter() in Havana.
 class MulticallWaiter(object):
     def __init__(self, conf, connection, timeout):
         self._connection = connection
@@ -353,22 +525,40 @@ def create_connection(conf, new, connection_pool):
     return ConnectionContext(conf, connection_pool, pooled=not new)
 
 
+_reply_proxy_create_sem = semaphore.Semaphore()
+
+
 def multicall(conf, context, topic, msg, timeout, connection_pool):
     """Make a call that returns multiple times."""
+    # TODO(pekowski): Remove all these comments in Havana.
+    # For amqp_rpc_single_reply_queue = False,
     # Can't use 'with' for multicall, as it returns an iterator
     # that will continue to use the connection.  When it's done,
     # connection.close() will get called which will put it back into
     # the pool
+    # For amqp_rpc_single_reply_queue = True,
+    # The 'with' statement is mandatory for closing the connection
     LOG.debug(_('Making synchronous call on %s ...'), topic)
     msg_id = uuid.uuid4().hex
     msg.update({'_msg_id': msg_id})
     LOG.debug(_('MSG_ID is %s') % (msg_id))
     pack_context(msg, context)
 
-    conn = ConnectionContext(conf, connection_pool)
-    wait_msg = MulticallWaiter(conf, conn, timeout)
-    conn.declare_direct_consumer(msg_id, wait_msg)
-    conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+    # TODO(pekowski): Remove this flag and the code under the if clause
+    #                 in Havana.
+    if not conf.amqp_rpc_single_reply_queue:
+        conn = ConnectionContext(conf, connection_pool)
+        wait_msg = MulticallWaiter(conf, conn, timeout)
+        conn.declare_direct_consumer(msg_id, wait_msg)
+        conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+    else:
+        with _reply_proxy_create_sem:
+            if not connection_pool.reply_proxy:
+                connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+        msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+        wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+        with ConnectionContext(conf, connection_pool) as conn:
+            conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
     return wait_msg
 
 
index 4377eaac50f494cb4b17427d7415fc4d904bfdf8..4421e7d951c2450b9ab2cf2fe5e712a53eea0c37 100644 (file)
@@ -197,6 +197,28 @@ class Connection(object):
         """
         raise NotImplementedError()
 
+    def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+        """Register as a member of a group of consumers for a given topic from
+        the specified exchange.
+
+        Exactly one member of a given pool will receive each message.
+
+        A message will be delivered to multiple pools, if more than
+        one is created.
+
+        :param callback: Callable to be invoked for each message.
+        :type callback: callable accepting one argument
+        :param pool_name: The name of the consumer pool.
+        :type pool_name: str
+        :param topic: The routing topic for desired messages.
+        :type topic: str
+        :param exchange_name: The name of the message exchange where
+                              the client should attach. Defaults to
+                              the configured exchange.
+        :type exchange_name: str
+        """
+        raise NotImplementedError()
+
     def consume_in_thread(self):
         """Spawn a thread to handle incoming messages.
 
index 1635e166e1ebeca01748082fe355ee28971d5e38..dd4d47e2185021cbd1c368183e51a26300f88432 100644 (file)
@@ -165,9 +165,10 @@ class ConsumerBase(object):
             try:
                 msg = rpc_common.deserialize_msg(message.payload)
                 callback(msg)
-                message.ack()
             except Exception:
                 LOG.exception(_("Failed to process message... skipping it."))
+            finally:
+                message.ack()
 
         self.queue.consume(*args, callback=_callback, **options)
 
@@ -750,6 +751,30 @@ class Connection(object):
         self.proxy_callbacks.append(proxy_cb)
         self.declare_topic_consumer(topic, proxy_cb, pool_name)
 
+    def join_consumer_pool(self, callback, pool_name, topic,
+                           exchange_name=None):
+        """Register as a member of a group of consumers for a given topic from
+        the specified exchange.
+
+        Exactly one member of a given pool will receive each message.
+
+        A message will be delivered to multiple pools, if more than
+        one is created.
+        """
+        callback_wrapper = rpc_amqp.CallbackWrapper(
+            conf=self.conf,
+            callback=callback,
+            connection_pool=rpc_amqp.get_connection_pool(self.conf,
+                                                         Connection),
+        )
+        self.proxy_callbacks.append(callback_wrapper)
+        self.declare_topic_consumer(
+            queue_name=pool_name,
+            topic=topic,
+            exchange_name=exchange_name,
+            callback=callback_wrapper,
+        )
+
 
 def create_connection(conf, new=True):
     """Create a connection"""
index 689c9704e2ec90eb2d9f8cf3b402c59ec6d936ae..ba0920a93649e88f1d9b53fce88b3d9498145b28 100644 (file)
@@ -560,6 +560,34 @@ class Connection(object):
 
         return consumer
 
+    def join_consumer_pool(self, callback, pool_name, topic,
+                           exchange_name=None):
+        """Register as a member of a group of consumers for a given topic from
+        the specified exchange.
+
+        Exactly one member of a given pool will receive each message.
+
+        A message will be delivered to multiple pools, if more than
+        one is created.
+        """
+        callback_wrapper = rpc_amqp.CallbackWrapper(
+            conf=self.conf,
+            callback=callback,
+            connection_pool=rpc_amqp.get_connection_pool(self.conf,
+                                                         Connection),
+        )
+        self.proxy_callbacks.append(callback_wrapper)
+
+        consumer = TopicConsumer(conf=self.conf,
+                                 session=self.session,
+                                 topic=topic,
+                                 callback=callback_wrapper,
+                                 name=pool_name,
+                                 exchange_name=exchange_name)
+
+        self._register_consumer(consumer)
+        return consumer
+
 
 def create_connection(conf, new=True):
     """Create a connection"""