# License for the specific language governing permissions and limitations
# under the License.
+import eventlet
import novaclient.v1_1.client as nclient
from novaclient.v1_1.contrib import server_external_events
from oslo.config import cfg
from neutron import context
from neutron import manager
from neutron.openstack.common import log as logging
-from neutron.openstack.common import loopingcall
from neutron.openstack.common import uuidutils
region_name=cfg.CONF.nova_region_name,
extensions=[server_external_events])
self.pending_events = []
- event_sender = loopingcall.FixedIntervalLoopingCall(self.send_events)
- event_sender.start(interval=cfg.CONF.send_events_interval)
+ self._waiting_to_send = False
+
+ def queue_event(self, event):
+ """Called to queue sending an event with the next batch of events.
+
+ Sending events individually, as they occur, has been problematic as it
+ can result in a flood of sends. Previously, there was a loopingcall
+ thread that would send batched events on a periodic interval. However,
+ maintaining a persistent thread in the loopingcall was also
+ problematic.
+
+ This replaces the loopingcall with a mechanism that creates a
+ short-lived thread on demand when the first event is queued. That
+ thread will sleep once for the same send_events_interval to allow other
+ events to queue up in pending_events and then will send them when it
+ wakes.
+
+ If a thread is already alive and waiting, this call will simply queue
+ the event and return leaving it up to the thread to send it.
+
+ :param event: the event that occured.
+ """
+ if not event:
+ return
+
+ self.pending_events.append(event)
+
+ if self._waiting_to_send:
+ return
+
+ self._waiting_to_send = True
+
+ def last_out_sends():
+ eventlet.sleep(cfg.CONF.send_events_interval)
+ self._waiting_to_send = False
+ self.send_events()
+
+ eventlet.spawn_n(last_out_sends)
def _is_compute_port(self, port):
try:
event = self.create_port_changed_event(action, original_obj,
returned_obj)
- if event:
- self.pending_events.append(event)
+ self.queue_event(event)
def create_port_changed_event(self, action, original_obj, returned_obj):
port = None
def send_port_status(self, mapper, connection, port):
event = getattr(port, "_notify_event", None)
- if event:
- self.pending_events.append(event)
+ self.queue_event(event)
port._notify_event = None
def send_events(self):
self.nova_notifier.pending_events.append(
{'name': 'network-changed', 'server_uuid': device_id})
self.nova_notifier.send_events()
+
+ def test_queue_event_no_event(self):
+ with mock.patch('eventlet.spawn_n') as spawn_n:
+ self.nova_notifier.queue_event(None)
+ self.assertEqual(0, len(self.nova_notifier.pending_events))
+ self.assertEqual(0, spawn_n.call_count)
+
+ def test_queue_event_first_event(self):
+ with mock.patch('eventlet.spawn_n') as spawn_n:
+ self.nova_notifier.queue_event(mock.Mock())
+ self.assertEqual(1, len(self.nova_notifier.pending_events))
+ self.assertEqual(1, spawn_n.call_count)
+
+ def test_queue_event_multiple_events(self):
+ with mock.patch('eventlet.spawn_n') as spawn_n:
+ events = 6
+ for i in range(0, events):
+ self.nova_notifier.queue_event(mock.Mock())
+ self.assertEqual(events, len(self.nova_notifier.pending_events))
+ self.assertEqual(1, spawn_n.call_count)
+
+ def test_queue_event_call_send_events(self):
+ with mock.patch.object(self.nova_notifier,
+ 'send_events') as send_events:
+ with mock.patch('eventlet.spawn_n') as spawn_n:
+ spawn_n.side_effect = lambda func: func()
+ self.nova_notifier.queue_event(mock.Mock())
+ self.assertFalse(self.nova_notifier._waiting_to_send)
+ send_events.assert_called_once_with()