From: Michal Dulko Date: Thu, 12 Mar 2015 16:24:09 +0000 (+0100) Subject: Add waiting for the driver to SchedulerManager X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=89106c52720e25f51384b3d77c710c7ddc7f1724;p=openstack-build%2Fcinder-build.git Add waiting for the driver to SchedulerManager This patch adds _wait_for_scheduler method before serving any request. Method waits till scheduler.is_ready() returns true or CONF.periodic_interval seconds passed from service startup. Change-Id: I9fab9fb076a955a24c1c157229baf027359d9771 Closes-Bug: 1409012 --- diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index ca0cc8681..cc03b15ac 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -19,6 +19,7 @@ Scheduler Service """ +import eventlet from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging @@ -75,11 +76,15 @@ class SchedulerManager(manager.Manager): 'combination of filters and weighers.')) self.driver = importutils.import_object(scheduler_driver) super(SchedulerManager, self).__init__(*args, **kwargs) + self._startup_delay = True def init_host_with_rpc(self): ctxt = context.get_admin_context() self.request_service_capabilities(ctxt) + eventlet.sleep(CONF.periodic_interval) + self._startup_delay = False + def update_service_capabilities(self, context, service_name=None, host=None, capabilities=None, **kwargs): """Process a capability update from a service node.""" @@ -89,10 +94,18 @@ class SchedulerManager(manager.Manager): host, capabilities) + def _wait_for_scheduler(self): + # NOTE(dulek): We're waiting for scheduler to announce that it's ready + # or CONF.periodic_interval seconds from service startup has passed. + while self._startup_delay and not self.driver.is_ready(): + eventlet.sleep(1) + def create_consistencygroup(self, context, topic, group_id, request_spec_list=None, filter_properties_list=None): + + self._wait_for_scheduler() try: self.driver.schedule_create_consistencygroup( context, group_id, @@ -117,6 +130,7 @@ class SchedulerManager(manager.Manager): image_id=None, request_spec=None, filter_properties=None): + self._wait_for_scheduler() try: flow_engine = create_volume.get_flow(context, db, self.driver, @@ -142,6 +156,8 @@ class SchedulerManager(manager.Manager): filter_properties=None): """Ensure that the host exists and can accept the volume.""" + self._wait_for_scheduler() + def _migrate_volume_set_error(self, context, ex, request_spec): volume_state = {'volume_state': {'migration_status': None}} self._set_volume_state_and_notify('migrate_volume_to_host', @@ -173,6 +189,9 @@ class SchedulerManager(manager.Manager): :param request_spec: parameters for this retype request :param filter_properties: parameters to filter by """ + + self._wait_for_scheduler() + def _retype_volume_set_error(self, context, ex, request_spec, volume_ref, msg, reservations): if reservations: @@ -223,6 +242,8 @@ class SchedulerManager(manager.Manager): request_spec, filter_properties=None): """Ensure that the host exists and can accept the volume.""" + self._wait_for_scheduler() + def _manage_existing_set_error(self, context, ex, request_spec): volume_state = {'volume_state': {'status': 'error'}} self._set_volume_state_and_notify('manage_existing', volume_state, @@ -244,7 +265,12 @@ class SchedulerManager(manager.Manager): request_spec.get('ref')) def get_pools(self, context, filters=None): - """Get active pools from scheduler's cache.""" + """Get active pools from scheduler's cache. + + NOTE(dulek): There's no self._wait_for_scheduler() because get_pools is + an RPC call (is blocking for the c-api). Also this is admin-only API + extension so it won't hurt the user much to retry the request manually. + """ return self.driver.get_pools(context, filters) def _set_volume_state_and_notify(self, method, updates, context, ex, diff --git a/cinder/tests/integrated/integrated_helpers.py b/cinder/tests/integrated/integrated_helpers.py index 70ed6c18f..20d6fbff1 100644 --- a/cinder/tests/integrated/integrated_helpers.py +++ b/cinder/tests/integrated/integrated_helpers.py @@ -22,6 +22,7 @@ import string import uuid import fixtures +import mock from oslo_log import log as logging from cinder import service @@ -69,7 +70,10 @@ class _IntegratedTestBase(test.TestCase): # set up services self.volume = self.start_service('volume') - self.scheduler = self.start_service('scheduler') + # NOTE(dulek): Mocking eventlet.sleep so test won't time out on + # scheduler service start. + with mock.patch('eventlet.sleep'): + self.scheduler = self.start_service('scheduler') self._start_api_service() self.addCleanup(self.osapi.stop) diff --git a/cinder/tests/scheduler/test_scheduler.py b/cinder/tests/scheduler/test_scheduler.py index 784c1fc2b..623fc3219 100644 --- a/cinder/tests/scheduler/test_scheduler.py +++ b/cinder/tests/scheduler/test_scheduler.py @@ -49,6 +49,7 @@ class SchedulerManagerTestCase(test.TestCase): super(SchedulerManagerTestCase, self).setUp() self.flags(scheduler_driver=self.driver_cls_name) self.manager = self.manager_cls() + self.manager._startup_delay = False self.context = context.RequestContext('fake_user', 'fake_project') self.topic = 'fake_topic' self.fake_args = (1, 2, 3) @@ -59,6 +60,15 @@ class SchedulerManagerTestCase(test.TestCase): manager = self.manager self.assertIsInstance(manager.driver, self.driver_cls) + @mock.patch('eventlet.sleep') + @mock.patch('cinder.volume.rpcapi.VolumeAPI.publish_service_capabilities') + def test_init_host_with_rpc(self, publish_capabilities_mock, sleep_mock): + self.manager._startup_delay = True + self.manager.init_host_with_rpc() + publish_capabilities_mock.assert_called_once_with(mock.ANY) + sleep_mock.assert_called_once_with(CONF.periodic_interval) + self.assertFalse(self.manager._startup_delay) + @mock.patch('cinder.scheduler.driver.Scheduler.' 'update_service_capabilities') def test_update_service_capabilities_empty_dict(self, _mock_update_cap): @@ -105,6 +115,65 @@ class SchedulerManagerTestCase(test.TestCase): _mock_sched_create.assert_called_once_with(self.context, request_spec, {}) + @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') + @mock.patch('eventlet.sleep') + def test_create_volume_no_delay(self, _mock_sleep, _mock_sched_create): + fake_volume_id = 1 + topic = 'fake_topic' + + request_spec = {'volume_id': fake_volume_id} + + self.manager.create_volume(self.context, topic, fake_volume_id, + request_spec=request_spec, + filter_properties={}) + _mock_sched_create.assert_called_once_with(self.context, request_spec, + {}) + self.assertFalse(_mock_sleep.called) + + @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') + @mock.patch('cinder.scheduler.driver.Scheduler.is_ready') + @mock.patch('eventlet.sleep') + def test_create_volume_delay_scheduled_after_3_tries(self, _mock_sleep, + _mock_is_ready, + _mock_sched_create): + self.manager._startup_delay = True + fake_volume_id = 1 + topic = 'fake_topic' + + request_spec = {'volume_id': fake_volume_id} + + _mock_is_ready.side_effect = [False, False, True] + + self.manager.create_volume(self.context, topic, fake_volume_id, + request_spec=request_spec, + filter_properties={}) + _mock_sched_create.assert_called_once_with(self.context, request_spec, + {}) + calls = [mock.call(1)] * 2 + _mock_sleep.assert_has_calls(calls) + self.assertEqual(2, _mock_sleep.call_count) + + @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') + @mock.patch('cinder.scheduler.driver.Scheduler.is_ready') + @mock.patch('eventlet.sleep') + def test_create_volume_delay_scheduled_in_1_try(self, _mock_sleep, + _mock_is_ready, + _mock_sched_create): + self.manager._startup_delay = True + fake_volume_id = 1 + topic = 'fake_topic' + + request_spec = {'volume_id': fake_volume_id} + + _mock_is_ready.return_value = True + + self.manager.create_volume(self.context, topic, fake_volume_id, + request_spec=request_spec, + filter_properties={}) + _mock_sched_create.assert_called_once_with(self.context, request_spec, + {}) + self.assertFalse(_mock_sleep.called) + @mock.patch('cinder.scheduler.driver.Scheduler.host_passes_filters') @mock.patch('cinder.db.volume_update') def test_migrate_volume_exception_returns_volume_state(