]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add waiting for the driver to SchedulerManager
authorMichal Dulko <michal.dulko@intel.com>
Thu, 12 Mar 2015 16:24:09 +0000 (17:24 +0100)
committerMichal Dulko <michal.dulko@intel.com>
Thu, 12 Mar 2015 16:24:09 +0000 (17:24 +0100)
This patch adds _wait_for_scheduler method before serving any request.
Method waits till scheduler.is_ready() returns true or
CONF.periodic_interval seconds passed from service startup.

Change-Id: I9fab9fb076a955a24c1c157229baf027359d9771
Closes-Bug: 1409012

cinder/scheduler/manager.py
cinder/tests/integrated/integrated_helpers.py
cinder/tests/scheduler/test_scheduler.py

index ca0cc868183efd08026fe2d0963f0977d430336f..cc03b15ac8d3a651bec6273909aabc3a54a4b740 100644 (file)
@@ -19,6 +19,7 @@
 Scheduler Service
 """
 
+import eventlet
 from oslo_config import cfg
 from oslo_log import log as logging
 import oslo_messaging as messaging
@@ -75,11 +76,15 @@ class SchedulerManager(manager.Manager):
                 'combination of filters and weighers.'))
         self.driver = importutils.import_object(scheduler_driver)
         super(SchedulerManager, self).__init__(*args, **kwargs)
+        self._startup_delay = True
 
     def init_host_with_rpc(self):
         ctxt = context.get_admin_context()
         self.request_service_capabilities(ctxt)
 
+        eventlet.sleep(CONF.periodic_interval)
+        self._startup_delay = False
+
     def update_service_capabilities(self, context, service_name=None,
                                     host=None, capabilities=None, **kwargs):
         """Process a capability update from a service node."""
@@ -89,10 +94,18 @@ class SchedulerManager(manager.Manager):
                                                 host,
                                                 capabilities)
 
+    def _wait_for_scheduler(self):
+        # NOTE(dulek): We're waiting for scheduler to announce that it's ready
+        # or CONF.periodic_interval seconds from service startup has passed.
+        while self._startup_delay and not self.driver.is_ready():
+            eventlet.sleep(1)
+
     def create_consistencygroup(self, context, topic,
                                 group_id,
                                 request_spec_list=None,
                                 filter_properties_list=None):
+
+        self._wait_for_scheduler()
         try:
             self.driver.schedule_create_consistencygroup(
                 context, group_id,
@@ -117,6 +130,7 @@ class SchedulerManager(manager.Manager):
                       image_id=None, request_spec=None,
                       filter_properties=None):
 
+        self._wait_for_scheduler()
         try:
             flow_engine = create_volume.get_flow(context,
                                                  db, self.driver,
@@ -142,6 +156,8 @@ class SchedulerManager(manager.Manager):
                                filter_properties=None):
         """Ensure that the host exists and can accept the volume."""
 
+        self._wait_for_scheduler()
+
         def _migrate_volume_set_error(self, context, ex, request_spec):
             volume_state = {'volume_state': {'migration_status': None}}
             self._set_volume_state_and_notify('migrate_volume_to_host',
@@ -173,6 +189,9 @@ class SchedulerManager(manager.Manager):
         :param request_spec: parameters for this retype request
         :param filter_properties: parameters to filter by
         """
+
+        self._wait_for_scheduler()
+
         def _retype_volume_set_error(self, context, ex, request_spec,
                                      volume_ref, msg, reservations):
             if reservations:
@@ -223,6 +242,8 @@ class SchedulerManager(manager.Manager):
                         request_spec, filter_properties=None):
         """Ensure that the host exists and can accept the volume."""
 
+        self._wait_for_scheduler()
+
         def _manage_existing_set_error(self, context, ex, request_spec):
             volume_state = {'volume_state': {'status': 'error'}}
             self._set_volume_state_and_notify('manage_existing', volume_state,
@@ -244,7 +265,12 @@ class SchedulerManager(manager.Manager):
                                                       request_spec.get('ref'))
 
     def get_pools(self, context, filters=None):
-        """Get active pools from scheduler's cache."""
+        """Get active pools from scheduler's cache.
+
+        NOTE(dulek): There's no self._wait_for_scheduler() because get_pools is
+        an RPC call (is blocking for the c-api). Also this is admin-only API
+        extension so it won't hurt the user much to retry the request manually.
+        """
         return self.driver.get_pools(context, filters)
 
     def _set_volume_state_and_notify(self, method, updates, context, ex,
index 70ed6c18f80ce1149381563cf06732fa14ca0a51..20d6fbff16ff6a48e713c941b6e603e820638055 100644 (file)
@@ -22,6 +22,7 @@ import string
 import uuid
 
 import fixtures
+import mock
 from oslo_log import log as logging
 
 from cinder import service
@@ -69,7 +70,10 @@ class _IntegratedTestBase(test.TestCase):
 
         # set up services
         self.volume = self.start_service('volume')
-        self.scheduler = self.start_service('scheduler')
+        # NOTE(dulek): Mocking eventlet.sleep so test won't time out on
+        # scheduler service start.
+        with mock.patch('eventlet.sleep'):
+            self.scheduler = self.start_service('scheduler')
         self._start_api_service()
         self.addCleanup(self.osapi.stop)
 
index 784c1fc2b1480f9a0e9effb7e2d4d26b8e0e33a1..623fc3219f64b8aecdc2e38982acae2daa97f4b1 100644 (file)
@@ -49,6 +49,7 @@ class SchedulerManagerTestCase(test.TestCase):
         super(SchedulerManagerTestCase, self).setUp()
         self.flags(scheduler_driver=self.driver_cls_name)
         self.manager = self.manager_cls()
+        self.manager._startup_delay = False
         self.context = context.RequestContext('fake_user', 'fake_project')
         self.topic = 'fake_topic'
         self.fake_args = (1, 2, 3)
@@ -59,6 +60,15 @@ class SchedulerManagerTestCase(test.TestCase):
         manager = self.manager
         self.assertIsInstance(manager.driver, self.driver_cls)
 
+    @mock.patch('eventlet.sleep')
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.publish_service_capabilities')
+    def test_init_host_with_rpc(self, publish_capabilities_mock, sleep_mock):
+        self.manager._startup_delay = True
+        self.manager.init_host_with_rpc()
+        publish_capabilities_mock.assert_called_once_with(mock.ANY)
+        sleep_mock.assert_called_once_with(CONF.periodic_interval)
+        self.assertFalse(self.manager._startup_delay)
+
     @mock.patch('cinder.scheduler.driver.Scheduler.'
                 'update_service_capabilities')
     def test_update_service_capabilities_empty_dict(self, _mock_update_cap):
@@ -105,6 +115,65 @@ class SchedulerManagerTestCase(test.TestCase):
         _mock_sched_create.assert_called_once_with(self.context, request_spec,
                                                    {})
 
+    @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
+    @mock.patch('eventlet.sleep')
+    def test_create_volume_no_delay(self, _mock_sleep, _mock_sched_create):
+        fake_volume_id = 1
+        topic = 'fake_topic'
+
+        request_spec = {'volume_id': fake_volume_id}
+
+        self.manager.create_volume(self.context, topic, fake_volume_id,
+                                   request_spec=request_spec,
+                                   filter_properties={})
+        _mock_sched_create.assert_called_once_with(self.context, request_spec,
+                                                   {})
+        self.assertFalse(_mock_sleep.called)
+
+    @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
+    @mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
+    @mock.patch('eventlet.sleep')
+    def test_create_volume_delay_scheduled_after_3_tries(self, _mock_sleep,
+                                                         _mock_is_ready,
+                                                         _mock_sched_create):
+        self.manager._startup_delay = True
+        fake_volume_id = 1
+        topic = 'fake_topic'
+
+        request_spec = {'volume_id': fake_volume_id}
+
+        _mock_is_ready.side_effect = [False, False, True]
+
+        self.manager.create_volume(self.context, topic, fake_volume_id,
+                                   request_spec=request_spec,
+                                   filter_properties={})
+        _mock_sched_create.assert_called_once_with(self.context, request_spec,
+                                                   {})
+        calls = [mock.call(1)] * 2
+        _mock_sleep.assert_has_calls(calls)
+        self.assertEqual(2, _mock_sleep.call_count)
+
+    @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
+    @mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
+    @mock.patch('eventlet.sleep')
+    def test_create_volume_delay_scheduled_in_1_try(self, _mock_sleep,
+                                                    _mock_is_ready,
+                                                    _mock_sched_create):
+        self.manager._startup_delay = True
+        fake_volume_id = 1
+        topic = 'fake_topic'
+
+        request_spec = {'volume_id': fake_volume_id}
+
+        _mock_is_ready.return_value = True
+
+        self.manager.create_volume(self.context, topic, fake_volume_id,
+                                   request_spec=request_spec,
+                                   filter_properties={})
+        _mock_sched_create.assert_called_once_with(self.context, request_spec,
+                                                   {})
+        self.assertFalse(_mock_sleep.called)
+
     @mock.patch('cinder.scheduler.driver.Scheduler.host_passes_filters')
     @mock.patch('cinder.db.volume_update')
     def test_migrate_volume_exception_returns_volume_state(