From: Carl Baldwin Date: Wed, 2 Apr 2014 16:53:33 +0000 (+0000) Subject: Replace loopingcall in notifier with a delayed send X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=3be1d7a75c5ec754825e99e0a8d95b4e1521ae4b;p=openstack-build%2Fneutron-build.git Replace loopingcall in notifier with a delayed send The loopingcall thread here was started before processes fork and so the thread stops working after the fork call. This is a problem that will probably need to be worked out in the long run. To ensure that this notifier works correctly in all processes, this change replaces the persistent loopingcall thread with a thread created on demand to delay and batch up notifications. The first notification will trigger spawning the thread to wait to send it. Any notifications that come in the meantime will notice that there is already a thread waiting to send and will return without spawning. Change-Id: I519d4e89b8cee341c0e1cfffbce3e77151e8202a Closes-Bug: #1301035 --- diff --git a/neutron/notifiers/nova.py b/neutron/notifiers/nova.py index bcb253ccf..1237a16b7 100644 --- a/neutron/notifiers/nova.py +++ b/neutron/notifiers/nova.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet from novaclient import exceptions as nova_exceptions import novaclient.v1_1.client as nclient from novaclient.v1_1.contrib import server_external_events @@ -23,7 +24,6 @@ from neutron.common import constants from neutron import context from neutron import manager from neutron.openstack.common import log as logging -from neutron.openstack.common import loopingcall from neutron.openstack.common import uuidutils @@ -55,8 +55,44 @@ class Notifier(object): region_name=cfg.CONF.nova_region_name, extensions=[server_external_events]) self.pending_events = [] - event_sender = loopingcall.FixedIntervalLoopingCall(self.send_events) - event_sender.start(interval=cfg.CONF.send_events_interval) + self._waiting_to_send = False + + def queue_event(self, event): + """Called to queue sending an event with the next batch of events. + + Sending events individually, as they occur, has been problematic as it + can result in a flood of sends. Previously, there was a loopingcall + thread that would send batched events on a periodic interval. However, + maintaining a persistent thread in the loopingcall was also + problematic. + + This replaces the loopingcall with a mechanism that creates a + short-lived thread on demand when the first event is queued. That + thread will sleep once for the same send_events_interval to allow other + events to queue up in pending_events and then will send them when it + wakes. + + If a thread is already alive and waiting, this call will simply queue + the event and return leaving it up to the thread to send it. + + :param event: the event that occured. + """ + if not event: + return + + self.pending_events.append(event) + + if self._waiting_to_send: + return + + self._waiting_to_send = True + + def last_out_sends(): + eventlet.sleep(cfg.CONF.send_events_interval) + self._waiting_to_send = False + self.send_events() + + eventlet.spawn_n(last_out_sends) def _is_compute_port(self, port): try: @@ -94,8 +130,7 @@ class Notifier(object): event = self.create_port_changed_event(action, original_obj, returned_obj) - if event: - self.pending_events.append(event) + self.queue_event(event) def create_port_changed_event(self, action, original_obj, returned_obj): port = None @@ -172,8 +207,7 @@ class Notifier(object): def send_port_status(self, mapper, connection, port): event = getattr(port, "_notify_event", None) - if event: - self.pending_events.append(event) + self.queue_event(event) port._notify_event = None def send_events(self): diff --git a/neutron/tests/unit/notifiers/test_notifiers_nova.py b/neutron/tests/unit/notifiers/test_notifiers_nova.py index eb32aef6d..7972ebf55 100644 --- a/neutron/tests/unit/notifiers/test_notifiers_nova.py +++ b/neutron/tests/unit/notifiers/test_notifiers_nova.py @@ -274,3 +274,32 @@ class TestNovaNotify(base.BaseTestCase): self.nova_notifier.pending_events.append( {'name': 'network-changed', 'server_uuid': device_id}) self.nova_notifier.send_events() + + def test_queue_event_no_event(self): + with mock.patch('eventlet.spawn_n') as spawn_n: + self.nova_notifier.queue_event(None) + self.assertEqual(0, len(self.nova_notifier.pending_events)) + self.assertEqual(0, spawn_n.call_count) + + def test_queue_event_first_event(self): + with mock.patch('eventlet.spawn_n') as spawn_n: + self.nova_notifier.queue_event(mock.Mock()) + self.assertEqual(1, len(self.nova_notifier.pending_events)) + self.assertEqual(1, spawn_n.call_count) + + def test_queue_event_multiple_events(self): + with mock.patch('eventlet.spawn_n') as spawn_n: + events = 6 + for i in range(0, events): + self.nova_notifier.queue_event(mock.Mock()) + self.assertEqual(events, len(self.nova_notifier.pending_events)) + self.assertEqual(1, spawn_n.call_count) + + def test_queue_event_call_send_events(self): + with mock.patch.object(self.nova_notifier, + 'send_events') as send_events: + with mock.patch('eventlet.spawn_n') as spawn_n: + spawn_n.side_effect = lambda func: func() + self.nova_notifier.queue_event(mock.Mock()) + self.assertFalse(self.nova_notifier._waiting_to_send) + send_events.assert_called_once_with()