]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Replace loopingcall in notifier with a delayed send
authorCarl Baldwin <carl.baldwin@hp.com>
Wed, 2 Apr 2014 16:53:33 +0000 (16:53 +0000)
committerThomas Goirand <thomas@goirand.fr>
Mon, 9 Jun 2014 15:06:55 +0000 (23:06 +0800)
The loopingcall thread here was started before processes fork and so
the thread stops working after the fork call.  This is a problem that
will probably need to be worked out in the long run.

To ensure that this notifier works correctly in all processes, this
change replaces the persistent loopingcall thread with a thread
created on demand to delay and batch up notifications.  The first
notification will trigger spawning the thread to wait to send it.  Any
notifications that come in the meantime will notice that there is
already a thread waiting to send and will return without spawning.

Conflicts:
        neutron/notifiers/nova.py

Change-Id: I519d4e89b8cee341c0e1cfffbce3e77151e8202a
Closes-Bug: #1301035
(cherry picked from commit 3be1d7a75c5ec754825e99e0a8d95b4e1521ae4b)

neutron/notifiers/nova.py
neutron/tests/unit/notifiers/test_notifiers_nova.py

index e8c560e48e127aa534f2e695b7c0884b0fe31f87..48f959effde6d778f5748c456f8eba95310e93a1 100644 (file)
@@ -13,6 +13,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import eventlet
 import novaclient.v1_1.client as nclient
 from novaclient.v1_1.contrib import server_external_events
 from oslo.config import cfg
@@ -22,7 +23,6 @@ from neutron.common import constants
 from neutron import context
 from neutron import manager
 from neutron.openstack.common import log as logging
-from neutron.openstack.common import loopingcall
 from neutron.openstack.common import uuidutils
 
 
@@ -52,8 +52,44 @@ class Notifier(object):
             region_name=cfg.CONF.nova_region_name,
             extensions=[server_external_events])
         self.pending_events = []
-        event_sender = loopingcall.FixedIntervalLoopingCall(self.send_events)
-        event_sender.start(interval=cfg.CONF.send_events_interval)
+        self._waiting_to_send = False
+
+    def queue_event(self, event):
+        """Called to queue sending an event with the next batch of events.
+
+        Sending events individually, as they occur, has been problematic as it
+        can result in a flood of sends.  Previously, there was a loopingcall
+        thread that would send batched events on a periodic interval.  However,
+        maintaining a persistent thread in the loopingcall was also
+        problematic.
+
+        This replaces the loopingcall with a mechanism that creates a
+        short-lived thread on demand when the first event is queued.  That
+        thread will sleep once for the same send_events_interval to allow other
+        events to queue up in pending_events and then will send them when it
+        wakes.
+
+        If a thread is already alive and waiting, this call will simply queue
+        the event and return leaving it up to the thread to send it.
+
+        :param event: the event that occured.
+        """
+        if not event:
+            return
+
+        self.pending_events.append(event)
+
+        if self._waiting_to_send:
+            return
+
+        self._waiting_to_send = True
+
+        def last_out_sends():
+            eventlet.sleep(cfg.CONF.send_events_interval)
+            self._waiting_to_send = False
+            self.send_events()
+
+        eventlet.spawn_n(last_out_sends)
 
     def _is_compute_port(self, port):
         try:
@@ -91,8 +127,7 @@ class Notifier(object):
 
         event = self.create_port_changed_event(action, original_obj,
                                                returned_obj)
-        if event:
-            self.pending_events.append(event)
+        self.queue_event(event)
 
     def create_port_changed_event(self, action, original_obj, returned_obj):
         port = None
@@ -169,8 +204,7 @@ class Notifier(object):
 
     def send_port_status(self, mapper, connection, port):
         event = getattr(port, "_notify_event", None)
-        if event:
-            self.pending_events.append(event)
+        self.queue_event(event)
         port._notify_event = None
 
     def send_events(self):
index 1fbdabf25af8532ae44bf9342dd607fe044c861f..be58980bddd683706be5c8b8f962055ab8297ccb 100644 (file)
@@ -266,3 +266,32 @@ class TestNovaNotify(base.BaseTestCase):
             self.nova_notifier.pending_events.append(
                 {'name': 'network-changed', 'server_uuid': device_id})
             self.nova_notifier.send_events()
+
+    def test_queue_event_no_event(self):
+        with mock.patch('eventlet.spawn_n') as spawn_n:
+            self.nova_notifier.queue_event(None)
+            self.assertEqual(0, len(self.nova_notifier.pending_events))
+            self.assertEqual(0, spawn_n.call_count)
+
+    def test_queue_event_first_event(self):
+        with mock.patch('eventlet.spawn_n') as spawn_n:
+            self.nova_notifier.queue_event(mock.Mock())
+            self.assertEqual(1, len(self.nova_notifier.pending_events))
+            self.assertEqual(1, spawn_n.call_count)
+
+    def test_queue_event_multiple_events(self):
+        with mock.patch('eventlet.spawn_n') as spawn_n:
+            events = 6
+            for i in range(0, events):
+                self.nova_notifier.queue_event(mock.Mock())
+            self.assertEqual(events, len(self.nova_notifier.pending_events))
+            self.assertEqual(1, spawn_n.call_count)
+
+    def test_queue_event_call_send_events(self):
+        with mock.patch.object(self.nova_notifier,
+                               'send_events') as send_events:
+            with mock.patch('eventlet.spawn_n') as spawn_n:
+                spawn_n.side_effect = lambda func: func()
+                self.nova_notifier.queue_event(mock.Mock())
+                self.assertFalse(self.nova_notifier._waiting_to_send)
+                send_events.assert_called_once_with()