From 68a5a387020cdb0c5d664b7add2d897134877d19 Mon Sep 17 00:00:00 2001 From: Ben Nemec Date: Fri, 7 Jun 2013 16:35:14 +0000 Subject: [PATCH] Sync Qpid RPC fix from Oslo Qpid cannot serialize dicts containing strings longer than 65535 characters. This change syncs the fix from Oslo to Quantum. Fixes bug 1175808 Change-Id: I48071abffa86e71727deed05aca08ac475cbaf05 --- quantum/openstack/common/rpc/impl_qpid.py | 111 ++++++++++++++++------ 1 file changed, 82 insertions(+), 29 deletions(-) diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index 5a677743e..3c9bf6574 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -31,6 +31,7 @@ from quantum.openstack.common import log as logging from quantum.openstack.common.rpc import amqp as rpc_amqp from quantum.openstack.common.rpc import common as rpc_common +qpid_codec = importutils.try_import("qpid.codec010") qpid_messaging = importutils.try_import("qpid.messaging") qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") @@ -69,6 +70,8 @@ qpid_opts = [ cfg.CONF.register_opts(qpid_opts) +JSON_CONTENT_TYPE = 'application/json; charset=utf8' + class ConsumerBase(object): """Consumer base class.""" @@ -118,15 +121,32 @@ class ConsumerBase(object): self.reconnect(session) def reconnect(self, session): - """Re-declare the receiver after a qpid reconnect""" + """Re-declare the receiver after a qpid reconnect.""" self.session = session self.receiver = session.receiver(self.address) self.receiver.capacity = 1 + def _unpack_json_msg(self, msg): + """Load the JSON data in msg if msg.content_type indicates that it + is necessary. Put the loaded data back into msg.content and + update msg.content_type appropriately. + + A Qpid Message containing a dict will have a content_type of + 'amqp/map', whereas one containing a string that needs to be converted + back from JSON will have a content_type of JSON_CONTENT_TYPE. + + :param msg: a Qpid Message object + :returns: None + """ + if msg.content_type == JSON_CONTENT_TYPE: + msg.content = jsonutils.loads(msg.content) + msg.content_type = 'amqp/map' + def consume(self): - """Fetch the message and pass it to the callback object""" + """Fetch the message and pass it to the callback object.""" message = self.receiver.fetch() try: + self._unpack_json_msg(message) msg = rpc_common.deserialize_msg(message.content) self.callback(msg) except Exception: @@ -139,7 +159,7 @@ class ConsumerBase(object): class DirectConsumer(ConsumerBase): - """Queue/consumer class for 'direct'""" + """Queue/consumer class for 'direct'.""" def __init__(self, conf, session, msg_id, callback): """Init a 'direct' queue. @@ -157,7 +177,7 @@ class DirectConsumer(ConsumerBase): class TopicConsumer(ConsumerBase): - """Consumer class for 'topic'""" + """Consumer class for 'topic'.""" def __init__(self, conf, session, topic, callback, name=None, exchange_name=None): @@ -177,7 +197,7 @@ class TopicConsumer(ConsumerBase): class FanoutConsumer(ConsumerBase): - """Consumer class for 'fanout'""" + """Consumer class for 'fanout'.""" def __init__(self, conf, session, topic, callback): """Init a 'fanout' queue. @@ -196,7 +216,7 @@ class FanoutConsumer(ConsumerBase): class Publisher(object): - """Base Publisher class""" + """Base Publisher class.""" def __init__(self, session, node_name, node_opts=None): """Init the Publisher class with the exchange_name, routing_key, @@ -225,16 +245,43 @@ class Publisher(object): self.reconnect(session) def reconnect(self, session): - """Re-establish the Sender after a reconnection""" + """Re-establish the Sender after a reconnection.""" self.sender = session.sender(self.address) + def _pack_json_msg(self, msg): + """Qpid cannot serialize dicts containing strings longer than 65535 + characters. This function dumps the message content to a JSON + string, which Qpid is able to handle. + + :param msg: May be either a Qpid Message object or a bare dict. + :returns: A Qpid Message with its content field JSON encoded. + """ + try: + msg.content = jsonutils.dumps(msg.content) + except AttributeError: + # Need to have a Qpid message so we can set the content_type. + msg = qpid_messaging.Message(jsonutils.dumps(msg)) + msg.content_type = JSON_CONTENT_TYPE + return msg + def send(self, msg): - """Send a message""" + """Send a message.""" + try: + # Check if Qpid can encode the message + check_msg = msg + if not hasattr(check_msg, 'content_type'): + check_msg = qpid_messaging.Message(msg) + content_type = check_msg.content_type + enc, dec = qpid_messaging.message.get_codec(content_type) + enc(check_msg.content) + except qpid_codec.CodecException: + # This means the message couldn't be serialized as a dict. + msg = self._pack_json_msg(msg) self.sender.send(msg) class DirectPublisher(Publisher): - """Publisher class for 'direct'""" + """Publisher class for 'direct'.""" def __init__(self, conf, session, msg_id): """Init a 'direct' publisher.""" super(DirectPublisher, self).__init__(session, msg_id, @@ -242,7 +289,7 @@ class DirectPublisher(Publisher): class TopicPublisher(Publisher): - """Publisher class for 'topic'""" + """Publisher class for 'topic'.""" def __init__(self, conf, session, topic): """init a 'topic' publisher. """ @@ -252,7 +299,7 @@ class TopicPublisher(Publisher): class FanoutPublisher(Publisher): - """Publisher class for 'fanout'""" + """Publisher class for 'fanout'.""" def __init__(self, conf, session, topic): """init a 'fanout' publisher. """ @@ -262,7 +309,7 @@ class FanoutPublisher(Publisher): class NotifyPublisher(Publisher): - """Publisher class for notifications""" + """Publisher class for notifications.""" def __init__(self, conf, session, topic): """init a 'topic' publisher. """ @@ -330,7 +377,7 @@ class Connection(object): return self.consumers[str(receiver)] def reconnect(self): - """Handles reconnecting and re-establishing sessions and queues""" + """Handles reconnecting and re-establishing sessions and queues.""" attempt = 0 delay = 1 while True: @@ -381,14 +428,20 @@ class Connection(object): self.reconnect() def close(self): - """Close/release this connection""" + """Close/release this connection.""" self.cancel_consumer_thread() self.wait_on_proxy_callbacks() - self.connection.close() + try: + self.connection.close() + except Exception: + # NOTE(dripton) Logging exceptions that happen during cleanup just + # causes confusion; there's really nothing useful we can do with + # them. + pass self.connection = None def reset(self): - """Reset a connection so it can be used again""" + """Reset a connection so it can be used again.""" self.cancel_consumer_thread() self.wait_on_proxy_callbacks() self.session.close() @@ -412,7 +465,7 @@ class Connection(object): return self.ensure(_connect_error, _declare_consumer) def iterconsume(self, limit=None, timeout=None): - """Return an iterator that will consume from all queues/consumers""" + """Return an iterator that will consume from all queues/consumers.""" def _error_callback(exc): if isinstance(exc, qpid_exceptions.Empty): @@ -436,7 +489,7 @@ class Connection(object): yield self.ensure(_error_callback, _consume) def cancel_consumer_thread(self): - """Cancel a consumer thread""" + """Cancel a consumer thread.""" if self.consumer_thread is not None: self.consumer_thread.kill() try: @@ -451,7 +504,7 @@ class Connection(object): proxy_cb.wait() def publisher_send(self, cls, topic, msg): - """Send to a publisher based on the publisher class""" + """Send to a publisher based on the publisher class.""" def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} @@ -481,15 +534,15 @@ class Connection(object): topic, callback) def declare_fanout_consumer(self, topic, callback): - """Create a 'fanout' consumer""" + """Create a 'fanout' consumer.""" self.declare_consumer(FanoutConsumer, topic, callback) def direct_send(self, msg_id, msg): - """Send a 'direct' message""" + """Send a 'direct' message.""" self.publisher_send(DirectPublisher, msg_id, msg) def topic_send(self, topic, msg, timeout=None): - """Send a 'topic' message""" + """Send a 'topic' message.""" # # We want to create a message with attributes, e.g. a TTL. We # don't really need to keep 'msg' in its JSON format any longer @@ -504,15 +557,15 @@ class Connection(object): self.publisher_send(TopicPublisher, topic, qpid_message) def fanout_send(self, topic, msg): - """Send a 'fanout' message""" + """Send a 'fanout' message.""" self.publisher_send(FanoutPublisher, topic, msg) def notify_send(self, topic, msg, **kwargs): - """Send a notify message on a topic""" + """Send a notify message on a topic.""" self.publisher_send(NotifyPublisher, topic, msg) def consume(self, limit=None): - """Consume from all queues/consumers""" + """Consume from all queues/consumers.""" it = self.iterconsume(limit=limit) while True: try: @@ -521,7 +574,7 @@ class Connection(object): return def consume_in_thread(self): - """Consumer from all queues/consumers in a greenthread""" + """Consumer from all queues/consumers in a greenthread.""" def _consumer_thread(): try: self.consume() @@ -532,7 +585,7 @@ class Connection(object): return self.consumer_thread def create_consumer(self, topic, proxy, fanout=False): - """Create a consumer that calls a method in a proxy object""" + """Create a consumer that calls a method in a proxy object.""" proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) @@ -548,7 +601,7 @@ class Connection(object): return consumer def create_worker(self, topic, proxy, pool_name): - """Create a worker that calls a method in a proxy object""" + """Create a worker that calls a method in a proxy object.""" proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) @@ -591,7 +644,7 @@ class Connection(object): def create_connection(conf, new=True): - """Create a connection""" + """Create a connection.""" return rpc_amqp.create_connection( conf, new, rpc_amqp.get_connection_pool(conf, Connection)) -- 2.45.2