]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Sync Qpid RPC fix from Oslo
authorBen Nemec <bnemec@us.ibm.com>
Fri, 7 Jun 2013 16:35:14 +0000 (16:35 +0000)
committerBen Nemec <bnemec@us.ibm.com>
Fri, 7 Jun 2013 16:35:14 +0000 (16:35 +0000)
Qpid cannot serialize dicts containing strings longer than 65535
characters.  This change syncs the fix from Oslo to Quantum.

Fixes bug 1175808

Change-Id: I48071abffa86e71727deed05aca08ac475cbaf05

quantum/openstack/common/rpc/impl_qpid.py

index 5a677743e550fe93cee87363fda7f8d0c316bc93..3c9bf6574ce6ddf476b553a790dac2434dc75f15 100644 (file)
@@ -31,6 +31,7 @@ from quantum.openstack.common import log as logging
 from quantum.openstack.common.rpc import amqp as rpc_amqp
 from quantum.openstack.common.rpc import common as rpc_common
 
+qpid_codec = importutils.try_import("qpid.codec010")
 qpid_messaging = importutils.try_import("qpid.messaging")
 qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
 
@@ -69,6 +70,8 @@ qpid_opts = [
 
 cfg.CONF.register_opts(qpid_opts)
 
+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
+
 
 class ConsumerBase(object):
     """Consumer base class."""
@@ -118,15 +121,32 @@ class ConsumerBase(object):
         self.reconnect(session)
 
     def reconnect(self, session):
-        """Re-declare the receiver after a qpid reconnect"""
+        """Re-declare the receiver after a qpid reconnect."""
         self.session = session
         self.receiver = session.receiver(self.address)
         self.receiver.capacity = 1
 
+    def _unpack_json_msg(self, msg):
+        """Load the JSON data in msg if msg.content_type indicates that it
+           is necessary.  Put the loaded data back into msg.content and
+           update msg.content_type appropriately.
+
+        A Qpid Message containing a dict will have a content_type of
+        'amqp/map', whereas one containing a string that needs to be converted
+        back from JSON will have a content_type of JSON_CONTENT_TYPE.
+
+        :param msg: a Qpid Message object
+        :returns: None
+        """
+        if msg.content_type == JSON_CONTENT_TYPE:
+            msg.content = jsonutils.loads(msg.content)
+            msg.content_type = 'amqp/map'
+
     def consume(self):
-        """Fetch the message and pass it to the callback object"""
+        """Fetch the message and pass it to the callback object."""
         message = self.receiver.fetch()
         try:
+            self._unpack_json_msg(message)
             msg = rpc_common.deserialize_msg(message.content)
             self.callback(msg)
         except Exception:
@@ -139,7 +159,7 @@ class ConsumerBase(object):
 
 
 class DirectConsumer(ConsumerBase):
-    """Queue/consumer class for 'direct'"""
+    """Queue/consumer class for 'direct'."""
 
     def __init__(self, conf, session, msg_id, callback):
         """Init a 'direct' queue.
@@ -157,7 +177,7 @@ class DirectConsumer(ConsumerBase):
 
 
 class TopicConsumer(ConsumerBase):
-    """Consumer class for 'topic'"""
+    """Consumer class for 'topic'."""
 
     def __init__(self, conf, session, topic, callback, name=None,
                  exchange_name=None):
@@ -177,7 +197,7 @@ class TopicConsumer(ConsumerBase):
 
 
 class FanoutConsumer(ConsumerBase):
-    """Consumer class for 'fanout'"""
+    """Consumer class for 'fanout'."""
 
     def __init__(self, conf, session, topic, callback):
         """Init a 'fanout' queue.
@@ -196,7 +216,7 @@ class FanoutConsumer(ConsumerBase):
 
 
 class Publisher(object):
-    """Base Publisher class"""
+    """Base Publisher class."""
 
     def __init__(self, session, node_name, node_opts=None):
         """Init the Publisher class with the exchange_name, routing_key,
@@ -225,16 +245,43 @@ class Publisher(object):
         self.reconnect(session)
 
     def reconnect(self, session):
-        """Re-establish the Sender after a reconnection"""
+        """Re-establish the Sender after a reconnection."""
         self.sender = session.sender(self.address)
 
+    def _pack_json_msg(self, msg):
+        """Qpid cannot serialize dicts containing strings longer than 65535
+           characters.  This function dumps the message content to a JSON
+           string, which Qpid is able to handle.
+
+        :param msg: May be either a Qpid Message object or a bare dict.
+        :returns: A Qpid Message with its content field JSON encoded.
+        """
+        try:
+            msg.content = jsonutils.dumps(msg.content)
+        except AttributeError:
+            # Need to have a Qpid message so we can set the content_type.
+            msg = qpid_messaging.Message(jsonutils.dumps(msg))
+        msg.content_type = JSON_CONTENT_TYPE
+        return msg
+
     def send(self, msg):
-        """Send a message"""
+        """Send a message."""
+        try:
+            # Check if Qpid can encode the message
+            check_msg = msg
+            if not hasattr(check_msg, 'content_type'):
+                check_msg = qpid_messaging.Message(msg)
+            content_type = check_msg.content_type
+            enc, dec = qpid_messaging.message.get_codec(content_type)
+            enc(check_msg.content)
+        except qpid_codec.CodecException:
+            # This means the message couldn't be serialized as a dict.
+            msg = self._pack_json_msg(msg)
         self.sender.send(msg)
 
 
 class DirectPublisher(Publisher):
-    """Publisher class for 'direct'"""
+    """Publisher class for 'direct'."""
     def __init__(self, conf, session, msg_id):
         """Init a 'direct' publisher."""
         super(DirectPublisher, self).__init__(session, msg_id,
@@ -242,7 +289,7 @@ class DirectPublisher(Publisher):
 
 
 class TopicPublisher(Publisher):
-    """Publisher class for 'topic'"""
+    """Publisher class for 'topic'."""
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
@@ -252,7 +299,7 @@ class TopicPublisher(Publisher):
 
 
 class FanoutPublisher(Publisher):
-    """Publisher class for 'fanout'"""
+    """Publisher class for 'fanout'."""
     def __init__(self, conf, session, topic):
         """init a 'fanout' publisher.
         """
@@ -262,7 +309,7 @@ class FanoutPublisher(Publisher):
 
 
 class NotifyPublisher(Publisher):
-    """Publisher class for notifications"""
+    """Publisher class for notifications."""
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
@@ -330,7 +377,7 @@ class Connection(object):
         return self.consumers[str(receiver)]
 
     def reconnect(self):
-        """Handles reconnecting and re-establishing sessions and queues"""
+        """Handles reconnecting and re-establishing sessions and queues."""
         attempt = 0
         delay = 1
         while True:
@@ -381,14 +428,20 @@ class Connection(object):
                 self.reconnect()
 
     def close(self):
-        """Close/release this connection"""
+        """Close/release this connection."""
         self.cancel_consumer_thread()
         self.wait_on_proxy_callbacks()
-        self.connection.close()
+        try:
+            self.connection.close()
+        except Exception:
+            # NOTE(dripton) Logging exceptions that happen during cleanup just
+            # causes confusion; there's really nothing useful we can do with
+            # them.
+            pass
         self.connection = None
 
     def reset(self):
-        """Reset a connection so it can be used again"""
+        """Reset a connection so it can be used again."""
         self.cancel_consumer_thread()
         self.wait_on_proxy_callbacks()
         self.session.close()
@@ -412,7 +465,7 @@ class Connection(object):
         return self.ensure(_connect_error, _declare_consumer)
 
     def iterconsume(self, limit=None, timeout=None):
-        """Return an iterator that will consume from all queues/consumers"""
+        """Return an iterator that will consume from all queues/consumers."""
 
         def _error_callback(exc):
             if isinstance(exc, qpid_exceptions.Empty):
@@ -436,7 +489,7 @@ class Connection(object):
             yield self.ensure(_error_callback, _consume)
 
     def cancel_consumer_thread(self):
-        """Cancel a consumer thread"""
+        """Cancel a consumer thread."""
         if self.consumer_thread is not None:
             self.consumer_thread.kill()
             try:
@@ -451,7 +504,7 @@ class Connection(object):
             proxy_cb.wait()
 
     def publisher_send(self, cls, topic, msg):
-        """Send to a publisher based on the publisher class"""
+        """Send to a publisher based on the publisher class."""
 
         def _connect_error(exc):
             log_info = {'topic': topic, 'err_str': str(exc)}
@@ -481,15 +534,15 @@ class Connection(object):
                               topic, callback)
 
     def declare_fanout_consumer(self, topic, callback):
-        """Create a 'fanout' consumer"""
+        """Create a 'fanout' consumer."""
         self.declare_consumer(FanoutConsumer, topic, callback)
 
     def direct_send(self, msg_id, msg):
-        """Send a 'direct' message"""
+        """Send a 'direct' message."""
         self.publisher_send(DirectPublisher, msg_id, msg)
 
     def topic_send(self, topic, msg, timeout=None):
-        """Send a 'topic' message"""
+        """Send a 'topic' message."""
         #
         # We want to create a message with attributes, e.g. a TTL. We
         # don't really need to keep 'msg' in its JSON format any longer
@@ -504,15 +557,15 @@ class Connection(object):
         self.publisher_send(TopicPublisher, topic, qpid_message)
 
     def fanout_send(self, topic, msg):
-        """Send a 'fanout' message"""
+        """Send a 'fanout' message."""
         self.publisher_send(FanoutPublisher, topic, msg)
 
     def notify_send(self, topic, msg, **kwargs):
-        """Send a notify message on a topic"""
+        """Send a notify message on a topic."""
         self.publisher_send(NotifyPublisher, topic, msg)
 
     def consume(self, limit=None):
-        """Consume from all queues/consumers"""
+        """Consume from all queues/consumers."""
         it = self.iterconsume(limit=limit)
         while True:
             try:
@@ -521,7 +574,7 @@ class Connection(object):
                 return
 
     def consume_in_thread(self):
-        """Consumer from all queues/consumers in a greenthread"""
+        """Consumer from all queues/consumers in a greenthread."""
         def _consumer_thread():
             try:
                 self.consume()
@@ -532,7 +585,7 @@ class Connection(object):
         return self.consumer_thread
 
     def create_consumer(self, topic, proxy, fanout=False):
-        """Create a consumer that calls a method in a proxy object"""
+        """Create a consumer that calls a method in a proxy object."""
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
@@ -548,7 +601,7 @@ class Connection(object):
         return consumer
 
     def create_worker(self, topic, proxy, pool_name):
-        """Create a worker that calls a method in a proxy object"""
+        """Create a worker that calls a method in a proxy object."""
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
@@ -591,7 +644,7 @@ class Connection(object):
 
 
 def create_connection(conf, new=True):
-    """Create a connection"""
+    """Create a connection."""
     return rpc_amqp.create_connection(
         conf, new,
         rpc_amqp.get_connection_pool(conf, Connection))