Scheduler Service
"""
+import eventlet
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
'combination of filters and weighers.'))
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
+ self._startup_delay = True
def init_host_with_rpc(self):
ctxt = context.get_admin_context()
self.request_service_capabilities(ctxt)
+ eventlet.sleep(CONF.periodic_interval)
+ self._startup_delay = False
+
def update_service_capabilities(self, context, service_name=None,
host=None, capabilities=None, **kwargs):
"""Process a capability update from a service node."""
host,
capabilities)
+ def _wait_for_scheduler(self):
+ # NOTE(dulek): We're waiting for scheduler to announce that it's ready
+ # or CONF.periodic_interval seconds from service startup has passed.
+ while self._startup_delay and not self.driver.is_ready():
+ eventlet.sleep(1)
+
def create_consistencygroup(self, context, topic,
group_id,
request_spec_list=None,
filter_properties_list=None):
+
+ self._wait_for_scheduler()
try:
self.driver.schedule_create_consistencygroup(
context, group_id,
image_id=None, request_spec=None,
filter_properties=None):
+ self._wait_for_scheduler()
try:
flow_engine = create_volume.get_flow(context,
db, self.driver,
filter_properties=None):
"""Ensure that the host exists and can accept the volume."""
+ self._wait_for_scheduler()
+
def _migrate_volume_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'migration_status': None}}
self._set_volume_state_and_notify('migrate_volume_to_host',
:param request_spec: parameters for this retype request
:param filter_properties: parameters to filter by
"""
+
+ self._wait_for_scheduler()
+
def _retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations):
if reservations:
request_spec, filter_properties=None):
"""Ensure that the host exists and can accept the volume."""
+ self._wait_for_scheduler()
+
def _manage_existing_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'status': 'error'}}
self._set_volume_state_and_notify('manage_existing', volume_state,
request_spec.get('ref'))
def get_pools(self, context, filters=None):
- """Get active pools from scheduler's cache."""
+ """Get active pools from scheduler's cache.
+
+ NOTE(dulek): There's no self._wait_for_scheduler() because get_pools is
+ an RPC call (is blocking for the c-api). Also this is admin-only API
+ extension so it won't hurt the user much to retry the request manually.
+ """
return self.driver.get_pools(context, filters)
def _set_volume_state_and_notify(self, method, updates, context, ex,
import uuid
import fixtures
+import mock
from oslo_log import log as logging
from cinder import service
# set up services
self.volume = self.start_service('volume')
- self.scheduler = self.start_service('scheduler')
+ # NOTE(dulek): Mocking eventlet.sleep so test won't time out on
+ # scheduler service start.
+ with mock.patch('eventlet.sleep'):
+ self.scheduler = self.start_service('scheduler')
self._start_api_service()
self.addCleanup(self.osapi.stop)
super(SchedulerManagerTestCase, self).setUp()
self.flags(scheduler_driver=self.driver_cls_name)
self.manager = self.manager_cls()
+ self.manager._startup_delay = False
self.context = context.RequestContext('fake_user', 'fake_project')
self.topic = 'fake_topic'
self.fake_args = (1, 2, 3)
manager = self.manager
self.assertIsInstance(manager.driver, self.driver_cls)
+ @mock.patch('eventlet.sleep')
+ @mock.patch('cinder.volume.rpcapi.VolumeAPI.publish_service_capabilities')
+ def test_init_host_with_rpc(self, publish_capabilities_mock, sleep_mock):
+ self.manager._startup_delay = True
+ self.manager.init_host_with_rpc()
+ publish_capabilities_mock.assert_called_once_with(mock.ANY)
+ sleep_mock.assert_called_once_with(CONF.periodic_interval)
+ self.assertFalse(self.manager._startup_delay)
+
@mock.patch('cinder.scheduler.driver.Scheduler.'
'update_service_capabilities')
def test_update_service_capabilities_empty_dict(self, _mock_update_cap):
_mock_sched_create.assert_called_once_with(self.context, request_spec,
{})
+ @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
+ @mock.patch('eventlet.sleep')
+ def test_create_volume_no_delay(self, _mock_sleep, _mock_sched_create):
+ fake_volume_id = 1
+ topic = 'fake_topic'
+
+ request_spec = {'volume_id': fake_volume_id}
+
+ self.manager.create_volume(self.context, topic, fake_volume_id,
+ request_spec=request_spec,
+ filter_properties={})
+ _mock_sched_create.assert_called_once_with(self.context, request_spec,
+ {})
+ self.assertFalse(_mock_sleep.called)
+
+ @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
+ @mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
+ @mock.patch('eventlet.sleep')
+ def test_create_volume_delay_scheduled_after_3_tries(self, _mock_sleep,
+ _mock_is_ready,
+ _mock_sched_create):
+ self.manager._startup_delay = True
+ fake_volume_id = 1
+ topic = 'fake_topic'
+
+ request_spec = {'volume_id': fake_volume_id}
+
+ _mock_is_ready.side_effect = [False, False, True]
+
+ self.manager.create_volume(self.context, topic, fake_volume_id,
+ request_spec=request_spec,
+ filter_properties={})
+ _mock_sched_create.assert_called_once_with(self.context, request_spec,
+ {})
+ calls = [mock.call(1)] * 2
+ _mock_sleep.assert_has_calls(calls)
+ self.assertEqual(2, _mock_sleep.call_count)
+
+ @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
+ @mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
+ @mock.patch('eventlet.sleep')
+ def test_create_volume_delay_scheduled_in_1_try(self, _mock_sleep,
+ _mock_is_ready,
+ _mock_sched_create):
+ self.manager._startup_delay = True
+ fake_volume_id = 1
+ topic = 'fake_topic'
+
+ request_spec = {'volume_id': fake_volume_id}
+
+ _mock_is_ready.return_value = True
+
+ self.manager.create_volume(self.context, topic, fake_volume_id,
+ request_spec=request_spec,
+ filter_properties={})
+ _mock_sched_create.assert_called_once_with(self.context, request_spec,
+ {})
+ self.assertFalse(_mock_sleep.called)
+
@mock.patch('cinder.scheduler.driver.Scheduler.host_passes_filters')
@mock.patch('cinder.db.volume_update')
def test_migrate_volume_exception_returns_volume_state(