From a5d16cfb8d6d953d2a9689ddc855cade604e1e9b Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Tue, 12 Jun 2012 17:07:18 -0400 Subject: [PATCH] Add missing ack to impl_qpid. Fix bug 1012374. Johannes Erdfelt pointed out that impl_qpid wasn't acking messages that it received. This turned out to be a nasty oversight, resulting in unbounded message queue growth inside of the python-qpid library. This fixes it. Change-Id: I0370293807f0282e1dbdd59246f70be031e888a9 Reviewed-on: https://review.openstack.org/9908 Reviewed-by: Vish Ishaya Approved: John Griffith Tested-by: Jenkins --- cinder/rpc/impl_qpid.py | 7 ++++++- cinder/tests/rpc/test_qpid.py | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/cinder/rpc/impl_qpid.py b/cinder/rpc/impl_qpid.py index 95ab00741..1a8c95bb9 100644 --- a/cinder/rpc/impl_qpid.py +++ b/cinder/rpc/impl_qpid.py @@ -134,7 +134,12 @@ class ConsumerBase(object): def consume(self): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() - self.callback(message.content) + try: + self.callback(message.content) + except Exception: + LOG.exception(_("Failed to process message... skipping it.")) + finally: + self.session.acknowledge(message) def get_receiver(self): return self.receiver diff --git a/cinder/tests/rpc/test_qpid.py b/cinder/tests/rpc/test_qpid.py index 4ec865169..9b33b578c 100644 --- a/cinder/tests/rpc/test_qpid.py +++ b/cinder/tests/rpc/test_qpid.py @@ -270,6 +270,7 @@ class RpcQpidTestCase(test.TestCase): self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( {"result": "foo", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) if multi: self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) @@ -277,16 +278,19 @@ class RpcQpidTestCase(test.TestCase): qpid.messaging.Message( {"result": "bar", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn( qpid.messaging.Message( {"result": "baz", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( self.mock_receiver) self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( {"failure": False, "ending": True})) + self.mock_session.acknowledge(mox.IgnoreArg()) self.mock_session.close() self.mock_connection.session().AndReturn(self.mock_session) -- 2.45.2