]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add missing ack to impl_qpid.
authorRussell Bryant <rbryant@redhat.com>
Tue, 12 Jun 2012 21:07:18 +0000 (17:07 -0400)
committerJenkins <jenkins@review.openstack.org>
Wed, 18 Jul 2012 16:42:22 +0000 (16:42 +0000)
Fix bug 1012374.

Johannes Erdfelt pointed out that impl_qpid wasn't acking messages that
it received.  This turned out to be a nasty oversight, resulting in
unbounded message queue growth inside of the python-qpid library.  This
fixes it.

Change-Id: I0370293807f0282e1dbdd59246f70be031e888a9
Reviewed-on: https://review.openstack.org/9908
Reviewed-by: Vish Ishaya <vishvananda@gmail.com>
Approved: John Griffith <john.griffith@solidfire.com>
Tested-by: Jenkins
cinder/rpc/impl_qpid.py
cinder/tests/rpc/test_qpid.py

index 95ab00741e888d8d3413a4f08b771d71054f94a4..1a8c95bb9dfc9902f03cdfb3baaca99cf3c2d46b 100644 (file)
@@ -134,7 +134,12 @@ class ConsumerBase(object):
     def consume(self):
         """Fetch the message and pass it to the callback object"""
         message = self.receiver.fetch()
-        self.callback(message.content)
+        try:
+            self.callback(message.content)
+        except Exception:
+            LOG.exception(_("Failed to process message... skipping it."))
+        finally:
+            self.session.acknowledge(message)
 
     def get_receiver(self):
         return self.receiver
index 4ec86516908302849a1de27ddb4f330b0872237e..9b33b578c3fed680a030105dfc62e0e010057928 100644 (file)
@@ -270,6 +270,7 @@ class RpcQpidTestCase(test.TestCase):
                                                         self.mock_receiver)
         self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
                         {"result": "foo", "failure": False, "ending": False}))
+        self.mock_session.acknowledge(mox.IgnoreArg())
         if multi:
             self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
                                                         self.mock_receiver)
@@ -277,16 +278,19 @@ class RpcQpidTestCase(test.TestCase):
                             qpid.messaging.Message(
                                 {"result": "bar", "failure": False,
                                  "ending": False}))
+            self.mock_session.acknowledge(mox.IgnoreArg())
             self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
                                                         self.mock_receiver)
             self.mock_receiver.fetch().AndReturn(
                             qpid.messaging.Message(
                                 {"result": "baz", "failure": False,
                                  "ending": False}))
+            self.mock_session.acknowledge(mox.IgnoreArg())
         self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
                                                         self.mock_receiver)
         self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
                         {"failure": False, "ending": True}))
+        self.mock_session.acknowledge(mox.IgnoreArg())
         self.mock_session.close()
         self.mock_connection.session().AndReturn(self.mock_session)