--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+
+
+class BatchNotifier(object):
+ def __init__(self, batch_interval, callback):
+ self.pending_events = []
+ self._waiting_to_send = False
+ self.callback = callback
+ self.batch_interval = batch_interval
+
+ def queue_event(self, event):
+ """Called to queue sending an event with the next batch of events.
+
+ Sending events individually, as they occur, has been problematic as it
+ can result in a flood of sends. Previously, there was a loopingcall
+ thread that would send batched events on a periodic interval. However,
+ maintaining a persistent thread in the loopingcall was also
+ problematic.
+
+ This replaces the loopingcall with a mechanism that creates a
+ short-lived thread on demand when the first event is queued. That
+ thread will sleep once for the same batch_duration to allow other
+ events to queue up in pending_events and then will send them when it
+ wakes.
+
+ If a thread is already alive and waiting, this call will simply queue
+ the event and return leaving it up to the thread to send it.
+
+ :param event: the event that occurred.
+ """
+ if not event:
+ return
+
+ self.pending_events.append(event)
+
+ if self._waiting_to_send:
+ return
+
+ self._waiting_to_send = True
+
+ def last_out_sends():
+ eventlet.sleep(self.batch_interval)
+ self._waiting_to_send = False
+ self._notify()
+
+ eventlet.spawn_n(last_out_sends)
+
+ def _notify(self):
+ if not self.pending_events:
+ return
+
+ batched_events = self.pending_events
+ self.pending_events = []
+ self.callback(batched_events)
# License for the specific language governing permissions and limitations
# under the License.
-import eventlet
from keystoneclient import auth as ks_auth
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient import session as ks_session
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
+from neutron.notifiers import batch_notifier
from neutron.openstack.common import uuidutils
session=session,
region_name=cfg.CONF.nova.region_name,
extensions=[server_external_events])
- self.pending_events = []
- self._waiting_to_send = False
-
- def queue_event(self, event):
- """Called to queue sending an event with the next batch of events.
-
- Sending events individually, as they occur, has been problematic as it
- can result in a flood of sends. Previously, there was a loopingcall
- thread that would send batched events on a periodic interval. However,
- maintaining a persistent thread in the loopingcall was also
- problematic.
-
- This replaces the loopingcall with a mechanism that creates a
- short-lived thread on demand when the first event is queued. That
- thread will sleep once for the same send_events_interval to allow other
- events to queue up in pending_events and then will send them when it
- wakes.
-
- If a thread is already alive and waiting, this call will simply queue
- the event and return leaving it up to the thread to send it.
-
- :param event: the event that occurred.
- """
- if not event:
- return
-
- self.pending_events.append(event)
-
- if self._waiting_to_send:
- return
-
- self._waiting_to_send = True
-
- def last_out_sends():
- eventlet.sleep(cfg.CONF.send_events_interval)
- self._waiting_to_send = False
- self.send_events()
-
- eventlet.spawn_n(last_out_sends)
+ self.batch_notifier = batch_notifier.BatchNotifier(
+ cfg.CONF.send_events_interval, self.send_events)
def _is_compute_port(self, port):
try:
disassociate_returned_obj = {'floatingip': {'port_id': None}}
event = self.create_port_changed_event(action, original_obj,
disassociate_returned_obj)
- self.queue_event(event)
+ self.batch_notifier.queue_event(event)
event = self.create_port_changed_event(action, original_obj,
returned_obj)
- self.queue_event(event)
+ self.batch_notifier.queue_event(event)
def create_port_changed_event(self, action, original_obj, returned_obj):
port = None
def send_port_status(self, mapper, connection, port):
event = getattr(port, "_notify_event", None)
- self.queue_event(event)
+ self.batch_notifier.queue_event(event)
port._notify_event = None
- def send_events(self):
- if not self.pending_events:
- return
-
- batched_events = self.pending_events
- self.pending_events = []
-
+ def send_events(self, batched_events):
LOG.debug("Sending events: %s", batched_events)
try:
response = self.nclient.server_external_events.create(
--- /dev/null
+# Copyright (c) 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.notifiers import batch_notifier
+from neutron.tests import base
+
+
+class TestBatchNotifier(base.BaseTestCase):
+ def setUp(self):
+ super(TestBatchNotifier, self).setUp()
+ self.notifier = batch_notifier.BatchNotifier(0.1, lambda x: x)
+ self.spawn_n = mock.patch('eventlet.spawn_n').start()
+
+ def test_queue_event_no_event(self):
+ self.notifier.queue_event(None)
+ self.assertEqual(0, len(self.notifier.pending_events))
+ self.assertEqual(0, self.spawn_n.call_count)
+
+ def test_queue_event_first_event(self):
+ self.notifier.queue_event(mock.Mock())
+ self.assertEqual(1, len(self.notifier.pending_events))
+ self.assertEqual(1, self.spawn_n.call_count)
+
+ def test_queue_event_multiple_events(self):
+ events = 6
+ for i in range(0, events):
+ self.notifier.queue_event(mock.Mock())
+ self.assertEqual(events, len(self.notifier.pending_events))
+ self.assertEqual(1, self.spawn_n.call_count)
+
+ def test_queue_event_call_send_events(self):
+ with mock.patch.object(self.notifier,
+ 'callback') as send_events:
+ self.spawn_n.side_effect = lambda func: func()
+ self.notifier.queue_event(mock.Mock())
+ self.assertFalse(self.notifier._waiting_to_send)
+ self.assertTrue(send_events.called)
self.nova_notifier.nclient.server_external_events,
'create') as nclient_create:
nclient_create.return_value = 'i am a string!'
- self.nova_notifier.send_events()
+ self.nova_notifier.send_events([])
def test_nova_send_event_rasies_404(self):
with mock.patch.object(
self.nova_notifier.nclient.server_external_events,
'create') as nclient_create:
nclient_create.side_effect = nova_exceptions.NotFound
- self.nova_notifier.send_events()
+ self.nova_notifier.send_events([])
def test_nova_send_events_raises(self):
with mock.patch.object(
self.nova_notifier.nclient.server_external_events,
'create') as nclient_create:
nclient_create.side_effect = Exception
- self.nova_notifier.send_events()
+ self.nova_notifier.send_events([])
def test_nova_send_events_returns_non_200(self):
device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
nclient_create.return_value = [{'code': 404,
'name': 'network-changed',
'server_uuid': device_id}]
- self.nova_notifier.pending_events.append(
- {'name': 'network-changed', 'server_uuid': device_id})
- self.nova_notifier.send_events()
+ self.nova_notifier.send_events(
+ [{'name': 'network-changed', 'server_uuid': device_id}])
def test_nova_send_events_return_200(self):
device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
nclient_create.return_value = [{'code': 200,
'name': 'network-changed',
'server_uuid': device_id}]
- self.nova_notifier.pending_events.append(
- {'name': 'network-changed', 'server_uuid': device_id})
- self.nova_notifier.send_events()
+ self.nova_notifier.send_events(
+ [{'name': 'network-changed', 'server_uuid': device_id}])
def test_nova_send_events_multiple(self):
device_id = '32102d7b-1cf4-404d-b50a-97aae1f55f87'
{'code': 200,
'name': 'network-changed',
'server_uuid': device_id}]
- self.nova_notifier.pending_events.append(
- {'name': 'network-changed', 'server_uuid': device_id})
- self.nova_notifier.pending_events.append(
- {'name': 'network-changed', 'server_uuid': device_id})
- self.nova_notifier.send_events()
-
- def test_queue_event_no_event(self):
- with mock.patch('eventlet.spawn_n') as spawn_n:
- self.nova_notifier.queue_event(None)
- self.assertEqual(0, len(self.nova_notifier.pending_events))
- self.assertEqual(0, spawn_n.call_count)
-
- def test_queue_event_first_event(self):
- with mock.patch('eventlet.spawn_n') as spawn_n:
- self.nova_notifier.queue_event(mock.Mock())
- self.assertEqual(1, len(self.nova_notifier.pending_events))
- self.assertEqual(1, spawn_n.call_count)
-
- def test_queue_event_multiple_events(self):
- with mock.patch('eventlet.spawn_n') as spawn_n:
- events = 6
- for i in range(0, events):
- self.nova_notifier.queue_event(mock.Mock())
- self.assertEqual(events, len(self.nova_notifier.pending_events))
- self.assertEqual(1, spawn_n.call_count)
-
- def test_queue_event_call_send_events(self):
- with mock.patch.object(self.nova_notifier,
- 'send_events') as send_events:
- with mock.patch('eventlet.spawn_n') as spawn_n:
- spawn_n.side_effect = lambda func: func()
- self.nova_notifier.queue_event(mock.Mock())
- self.assertFalse(self.nova_notifier._waiting_to_send)
- send_events.assert_called_once_with()
+ self.nova_notifier.send_events([
+ {'name': 'network-changed', 'server_uuid': device_id},
+ {'name': 'network-changed', 'server_uuid': device_id}])
def test_reassociate_floatingip_without_disassociate_event(self):
returned_obj = {'floatingip':
self.nova_notifier._waiting_to_send = True
self.nova_notifier.send_network_change(
'update_floatingip', original_obj, returned_obj)
- self.assertEqual(2, len(self.nova_notifier.pending_events))
+ self.assertEqual(
+ 2, len(self.nova_notifier.batch_notifier.pending_events))
returned_obj_non = {'floatingip': {'port_id': None}}
event_dis = self.nova_notifier.create_port_changed_event(
'update_floatingip', original_obj, returned_obj_non)
event_assoc = self.nova_notifier.create_port_changed_event(
'update_floatingip', original_obj, returned_obj)
- self.assertEqual(self.nova_notifier.pending_events[0], event_dis)
- self.assertEqual(self.nova_notifier.pending_events[1], event_assoc)
+ self.assertEqual(
+ self.nova_notifier.batch_notifier.pending_events[0], event_dis)
+ self.assertEqual(
+ self.nova_notifier.batch_notifier.pending_events[1], event_assoc)