]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Reuse nova batch notifier
authorAssaf Muller <amuller@redhat.com>
Mon, 13 Oct 2014 16:24:01 +0000 (19:24 +0300)
committerAssaf Muller <amuller@redhat.com>
Fri, 20 Mar 2015 13:55:08 +0000 (13:55 +0000)
Refactor the batch notifier currently used by the Nova notifier
into a separate class. It will be reused when batching L3 HA
state change events.

Partially-Implements: blueprint report-ha-router-master
Change-Id: I2f8cf261f48bdb632ac0bd643a337290b5297fce

neutron/notifiers/batch_notifier.py [new file with mode: 0644]
neutron/notifiers/nova.py
neutron/tests/unit/notifiers/test_batch_notifier.py [new file with mode: 0644]
neutron/tests/unit/notifiers/test_notifiers_nova.py

diff --git a/neutron/notifiers/batch_notifier.py b/neutron/notifiers/batch_notifier.py
new file mode 100644 (file)
index 0000000..0396042
--- /dev/null
@@ -0,0 +1,66 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+
+
+class BatchNotifier(object):
+    def __init__(self, batch_interval, callback):
+        self.pending_events = []
+        self._waiting_to_send = False
+        self.callback = callback
+        self.batch_interval = batch_interval
+
+    def queue_event(self, event):
+        """Called to queue sending an event with the next batch of events.
+
+        Sending events individually, as they occur, has been problematic as it
+        can result in a flood of sends.  Previously, there was a loopingcall
+        thread that would send batched events on a periodic interval.  However,
+        maintaining a persistent thread in the loopingcall was also
+        problematic.
+
+        This replaces the loopingcall with a mechanism that creates a
+        short-lived thread on demand when the first event is queued.  That
+        thread will sleep once for the same batch_duration to allow other
+        events to queue up in pending_events and then will send them when it
+        wakes.
+
+        If a thread is already alive and waiting, this call will simply queue
+        the event and return leaving it up to the thread to send it.
+
+        :param event: the event that occurred.
+        """
+        if not event:
+            return
+
+        self.pending_events.append(event)
+
+        if self._waiting_to_send:
+            return
+
+        self._waiting_to_send = True
+
+        def last_out_sends():
+            eventlet.sleep(self.batch_interval)
+            self._waiting_to_send = False
+            self._notify()
+
+        eventlet.spawn_n(last_out_sends)
+
+    def _notify(self):
+        if not self.pending_events:
+            return
+
+        batched_events = self.pending_events
+        self.pending_events = []
+        self.callback(batched_events)
index c31111f64de8f3107f3ec254fc57580881a4906c..4bad6dcbadd4de902aaf33d4f3ec8946f6965044 100644 (file)
@@ -13,7 +13,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import eventlet
 from keystoneclient import auth as ks_auth
 from keystoneclient.auth.identity import v2 as v2_auth
 from keystoneclient import session as ks_session
@@ -28,6 +27,7 @@ from neutron.common import constants
 from neutron import context
 from neutron.i18n import _LE, _LI, _LW
 from neutron import manager
+from neutron.notifiers import batch_notifier
 from neutron.openstack.common import uuidutils
 
 
@@ -105,45 +105,8 @@ class Notifier(object):
             session=session,
             region_name=cfg.CONF.nova.region_name,
             extensions=[server_external_events])
-        self.pending_events = []
-        self._waiting_to_send = False
-
-    def queue_event(self, event):
-        """Called to queue sending an event with the next batch of events.
-
-        Sending events individually, as they occur, has been problematic as it
-        can result in a flood of sends.  Previously, there was a loopingcall
-        thread that would send batched events on a periodic interval.  However,
-        maintaining a persistent thread in the loopingcall was also
-        problematic.
-
-        This replaces the loopingcall with a mechanism that creates a
-        short-lived thread on demand when the first event is queued.  That
-        thread will sleep once for the same send_events_interval to allow other
-        events to queue up in pending_events and then will send them when it
-        wakes.
-
-        If a thread is already alive and waiting, this call will simply queue
-        the event and return leaving it up to the thread to send it.
-
-        :param event: the event that occurred.
-        """
-        if not event:
-            return
-
-        self.pending_events.append(event)
-
-        if self._waiting_to_send:
-            return
-
-        self._waiting_to_send = True
-
-        def last_out_sends():
-            eventlet.sleep(cfg.CONF.send_events_interval)
-            self._waiting_to_send = False
-            self.send_events()
-
-        eventlet.spawn_n(last_out_sends)
+        self.batch_notifier = batch_notifier.BatchNotifier(
+            cfg.CONF.send_events_interval, self.send_events)
 
     def _is_compute_port(self, port):
         try:
@@ -189,11 +152,11 @@ class Notifier(object):
             disassociate_returned_obj = {'floatingip': {'port_id': None}}
             event = self.create_port_changed_event(action, original_obj,
                                                    disassociate_returned_obj)
-            self.queue_event(event)
+            self.batch_notifier.queue_event(event)
 
         event = self.create_port_changed_event(action, original_obj,
                                                returned_obj)
-        self.queue_event(event)
+        self.batch_notifier.queue_event(event)
 
     def create_port_changed_event(self, action, original_obj, returned_obj):
         port = None
@@ -270,16 +233,10 @@ class Notifier(object):
 
     def send_port_status(self, mapper, connection, port):
         event = getattr(port, "_notify_event", None)
-        self.queue_event(event)
+        self.batch_notifier.queue_event(event)
         port._notify_event = None
 
-    def send_events(self):
-        if not self.pending_events:
-            return
-
-        batched_events = self.pending_events
-        self.pending_events = []
-
+    def send_events(self, batched_events):
         LOG.debug("Sending events: %s", batched_events)
         try:
             response = self.nclient.server_external_events.create(
diff --git a/neutron/tests/unit/notifiers/test_batch_notifier.py b/neutron/tests/unit/notifiers/test_batch_notifier.py
new file mode 100644 (file)
index 0000000..23bede8
--- /dev/null
@@ -0,0 +1,51 @@
+# Copyright (c) 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+
+from neutron.notifiers import batch_notifier
+from neutron.tests import base
+
+
+class TestBatchNotifier(base.BaseTestCase):
+    def setUp(self):
+        super(TestBatchNotifier, self).setUp()
+        self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x)
+        self.spawn_n = mock.patch('eventlet.spawn_n').start()
+
+    def test_queue_event_no_event(self):
+        self.notifier.queue_event(None)
+        self.assertEqual(0, len(self.notifier.pending_events))
+        self.assertEqual(0, self.spawn_n.call_count)
+
+    def test_queue_event_first_event(self):
+        self.notifier.queue_event(mock.Mock())
+        self.assertEqual(1, len(self.notifier.pending_events))
+        self.assertEqual(1, self.spawn_n.call_count)
+
+    def test_queue_event_multiple_events(self):
+        events = 6
+        for i in range(0, events):
+            self.notifier.queue_event(mock.Mock())
+        self.assertEqual(events, len(self.notifier.pending_events))
+        self.assertEqual(1, self.spawn_n.call_count)
+
+    def test_queue_event_call_send_events(self):
+        with mock.patch.object(self.notifier,
+                               'callback') as send_events:
+            self.spawn_n.side_effect = lambda func: func()
+            self.notifier.queue_event(mock.Mock())
+            self.assertFalse(self.notifier._waiting_to_send)
+            self.assertTrue(send_events.called)
index afae75ec3c5f2efe92c8a32dfc192bf107fcfd17..49ccb975ae7d5a851ba9619d630d949535856dba 100644 (file)
@@ -218,21 +218,21 @@ class TestNovaNotify(base.BaseTestCase):
             self.nova_notifier.nclient.server_external_events,
                 'create') as nclient_create:
             nclient_create.return_value = 'i am a string!'
-            self.nova_notifier.send_events()
+            self.nova_notifier.send_events([])
 
     def test_nova_send_event_rasies_404(self):
         with mock.patch.object(
             self.nova_notifier.nclient.server_external_events,
                 'create') as nclient_create:
             nclient_create.side_effect = nova_exceptions.NotFound
-            self.nova_notifier.send_events()
+            self.nova_notifier.send_events([])
 
     def test_nova_send_events_raises(self):
         with mock.patch.object(
             self.nova_notifier.nclient.server_external_events,
                 'create') as nclient_create:
             nclient_create.side_effect = Exception
-            self.nova_notifier.send_events()
+            self.nova_notifier.send_events([])
 
     def test_nova_send_events_returns_non_200(self):
         device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
@@ -242,9 +242,8 @@ class TestNovaNotify(base.BaseTestCase):
             nclient_create.return_value = [{'code': 404,
                                             'name': 'network-changed',
                                             'server_uuid': device_id}]
-            self.nova_notifier.pending_events.append(
-                {'name': 'network-changed', 'server_uuid': device_id})
-            self.nova_notifier.send_events()
+            self.nova_notifier.send_events(
+                [{'name': 'network-changed', 'server_uuid': device_id}])
 
     def test_nova_send_events_return_200(self):
         device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
@@ -254,9 +253,8 @@ class TestNovaNotify(base.BaseTestCase):
             nclient_create.return_value = [{'code': 200,
                                             'name': 'network-changed',
                                             'server_uuid': device_id}]
-            self.nova_notifier.pending_events.append(
-                {'name': 'network-changed', 'server_uuid': device_id})
-            self.nova_notifier.send_events()
+            self.nova_notifier.send_events(
+                [{'name': 'network-changed', 'server_uuid': device_id}])
 
     def test_nova_send_events_multiple(self):
         device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
@@ -269,40 +267,9 @@ class TestNovaNotify(base.BaseTestCase):
                                            {'code': 200,
                                             'name': 'network-changed',
                                             'server_uuid': device_id}]
-            self.nova_notifier.pending_events.append(
-                {'name': 'network-changed', 'server_uuid': device_id})
-            self.nova_notifier.pending_events.append(
-                {'name': 'network-changed', 'server_uuid': device_id})
-            self.nova_notifier.send_events()
-
-    def test_queue_event_no_event(self):
-        with mock.patch('eventlet.spawn_n') as spawn_n:
-            self.nova_notifier.queue_event(None)
-            self.assertEqual(0, len(self.nova_notifier.pending_events))
-            self.assertEqual(0, spawn_n.call_count)
-
-    def test_queue_event_first_event(self):
-        with mock.patch('eventlet.spawn_n') as spawn_n:
-            self.nova_notifier.queue_event(mock.Mock())
-            self.assertEqual(1, len(self.nova_notifier.pending_events))
-            self.assertEqual(1, spawn_n.call_count)
-
-    def test_queue_event_multiple_events(self):
-        with mock.patch('eventlet.spawn_n') as spawn_n:
-            events = 6
-            for i in range(0, events):
-                self.nova_notifier.queue_event(mock.Mock())
-            self.assertEqual(events, len(self.nova_notifier.pending_events))
-            self.assertEqual(1, spawn_n.call_count)
-
-    def test_queue_event_call_send_events(self):
-        with mock.patch.object(self.nova_notifier,
-                               'send_events') as send_events:
-            with mock.patch('eventlet.spawn_n') as spawn_n:
-                spawn_n.side_effect = lambda func: func()
-                self.nova_notifier.queue_event(mock.Mock())
-                self.assertFalse(self.nova_notifier._waiting_to_send)
-                send_events.assert_called_once_with()
+            self.nova_notifier.send_events([
+                {'name': 'network-changed', 'server_uuid': device_id},
+                {'name': 'network-changed', 'server_uuid': device_id}])
 
     def test_reassociate_floatingip_without_disassociate_event(self):
         returned_obj = {'floatingip':
@@ -311,12 +278,15 @@ class TestNovaNotify(base.BaseTestCase):
         self.nova_notifier._waiting_to_send = True
         self.nova_notifier.send_network_change(
             'update_floatingip', original_obj, returned_obj)
-        self.assertEqual(2, len(self.nova_notifier.pending_events))
+        self.assertEqual(
+            2, len(self.nova_notifier.batch_notifier.pending_events))
 
         returned_obj_non = {'floatingip': {'port_id': None}}
         event_dis = self.nova_notifier.create_port_changed_event(
             'update_floatingip', original_obj, returned_obj_non)
         event_assoc = self.nova_notifier.create_port_changed_event(
             'update_floatingip', original_obj, returned_obj)
-        self.assertEqual(self.nova_notifier.pending_events[0], event_dis)
-        self.assertEqual(self.nova_notifier.pending_events[1], event_assoc)
+        self.assertEqual(
+            self.nova_notifier.batch_notifier.pending_events[0], event_dis)
+        self.assertEqual(
+            self.nova_notifier.batch_notifier.pending_events[1], event_assoc)