]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Replace loopingcall in notifier with a delayed send
authorCarl Baldwin <carl.baldwin@hp.com>
Wed, 2 Apr 2014 16:53:33 +0000 (16:53 +0000)
committerCarl Baldwin <carl.baldwin@hp.com>
Tue, 22 Apr 2014 16:22:02 +0000 (16:22 +0000)
The loopingcall thread here was started before processes fork and so
the thread stops working after the fork call.  This is a problem that
will probably need to be worked out in the long run.

To ensure that this notifier works correctly in all processes, this
change replaces the persistent loopingcall thread with a thread
created on demand to delay and batch up notifications.  The first
notification will trigger spawning the thread to wait to send it.  Any
notifications that come in the meantime will notice that there is
already a thread waiting to send and will return without spawning.

Change-Id: I519d4e89b8cee341c0e1cfffbce3e77151e8202a
Closes-Bug: #1301035

neutron/notifiers/nova.py
neutron/tests/unit/notifiers/test_notifiers_nova.py

index bcb253ccfa7ea536e111637405fde2669a6637db..1237a16b74f94aebc2737190f0d1b54b3bd3be72 100644 (file)
@@ -13,6 +13,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import eventlet
 from novaclient import exceptions as nova_exceptions
 import novaclient.v1_1.client as nclient
 from novaclient.v1_1.contrib import server_external_events
@@ -23,7 +24,6 @@ from neutron.common import constants
 from neutron import context
 from neutron import manager
 from neutron.openstack.common import log as logging
-from neutron.openstack.common import loopingcall
 from neutron.openstack.common import uuidutils
 
 
@@ -55,8 +55,44 @@ class Notifier(object):
             region_name=cfg.CONF.nova_region_name,
             extensions=[server_external_events])
         self.pending_events = []
-        event_sender = loopingcall.FixedIntervalLoopingCall(self.send_events)
-        event_sender.start(interval=cfg.CONF.send_events_interval)
+        self._waiting_to_send = False
+
+    def queue_event(self, event):
+        """Called to queue sending an event with the next batch of events.
+
+        Sending events individually, as they occur, has been problematic as it
+        can result in a flood of sends.  Previously, there was a loopingcall
+        thread that would send batched events on a periodic interval.  However,
+        maintaining a persistent thread in the loopingcall was also
+        problematic.
+
+        This replaces the loopingcall with a mechanism that creates a
+        short-lived thread on demand when the first event is queued.  That
+        thread will sleep once for the same send_events_interval to allow other
+        events to queue up in pending_events and then will send them when it
+        wakes.
+
+        If a thread is already alive and waiting, this call will simply queue
+        the event and return leaving it up to the thread to send it.
+
+        :param event: the event that occured.
+        """
+        if not event:
+            return
+
+        self.pending_events.append(event)
+
+        if self._waiting_to_send:
+            return
+
+        self._waiting_to_send = True
+
+        def last_out_sends():
+            eventlet.sleep(cfg.CONF.send_events_interval)
+            self._waiting_to_send = False
+            self.send_events()
+
+        eventlet.spawn_n(last_out_sends)
 
     def _is_compute_port(self, port):
         try:
@@ -94,8 +130,7 @@ class Notifier(object):
 
         event = self.create_port_changed_event(action, original_obj,
                                                returned_obj)
-        if event:
-            self.pending_events.append(event)
+        self.queue_event(event)
 
     def create_port_changed_event(self, action, original_obj, returned_obj):
         port = None
@@ -172,8 +207,7 @@ class Notifier(object):
 
     def send_port_status(self, mapper, connection, port):
         event = getattr(port, "_notify_event", None)
-        if event:
-            self.pending_events.append(event)
+        self.queue_event(event)
         port._notify_event = None
 
     def send_events(self):
index eb32aef6dff24f7c84913154cc3c61583fac3a89..7972ebf55a975cd9839b8ae7beeb5427a3093d34 100644 (file)
@@ -274,3 +274,32 @@ class TestNovaNotify(base.BaseTestCase):
             self.nova_notifier.pending_events.append(
                 {'name': 'network-changed', 'server_uuid': device_id})
             self.nova_notifier.send_events()
+
+    def test_queue_event_no_event(self):
+        with mock.patch('eventlet.spawn_n') as spawn_n:
+            self.nova_notifier.queue_event(None)
+            self.assertEqual(0, len(self.nova_notifier.pending_events))
+            self.assertEqual(0, spawn_n.call_count)
+
+    def test_queue_event_first_event(self):
+        with mock.patch('eventlet.spawn_n') as spawn_n:
+            self.nova_notifier.queue_event(mock.Mock())
+            self.assertEqual(1, len(self.nova_notifier.pending_events))
+            self.assertEqual(1, spawn_n.call_count)
+
+    def test_queue_event_multiple_events(self):
+        with mock.patch('eventlet.spawn_n') as spawn_n:
+            events = 6
+            for i in range(0, events):
+                self.nova_notifier.queue_event(mock.Mock())
+            self.assertEqual(events, len(self.nova_notifier.pending_events))
+            self.assertEqual(1, spawn_n.call_count)
+
+    def test_queue_event_call_send_events(self):
+        with mock.patch.object(self.nova_notifier,
+                               'send_events') as send_events:
+            with mock.patch('eventlet.spawn_n') as spawn_n:
+                spawn_n.side_effect = lambda func: func()
+                self.nova_notifier.queue_event(mock.Mock())
+                self.assertFalse(self.nova_notifier._waiting_to_send)
+                send_events.assert_called_once_with()