From: Zhiteng Huang Date: Sat, 6 Oct 2012 17:11:41 +0000 (+0800) Subject: Scheduler API clean up and refactor X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=7e26d1354af23668591d9df39ab61e67eeb37528;p=openstack-build%2Fcinder-build.git Scheduler API clean up and refactor Unlike Nova scheduler, which has to consider serving compute and volume scheduling, Cinder scheduler only serves volume scheduling, so there's no need to keep generic interface 'schedule'. Instead, 'schedule_create_volume' is added (if missing) to manager/driver class and chance/simple scheduler driver implementation. Also this patch changes the interface between API service and scheduler to allow more information about volume is passed to scheduler for advanced scheduling while maintained backward compatibility. And this change bumps scheduler RPC API to version 1.2. Change-Id: I42be05675cd73f89a03c84105ec512d7ee4f3c3a --- diff --git a/cinder/scheduler/chance.py b/cinder/scheduler/chance.py index 3cee1f053..e5af364f2 100644 --- a/cinder/scheduler/chance.py +++ b/cinder/scheduler/chance.py @@ -24,9 +24,13 @@ Chance (Random) Scheduler implementation import random from cinder import exception +from cinder import flags from cinder.scheduler import driver +FLAGS = flags.FLAGS + + class ChanceScheduler(driver.Scheduler): """Implements Scheduler as a random node selector.""" @@ -54,8 +58,16 @@ class ChanceScheduler(driver.Scheduler): return hosts[int(random.random() * len(hosts))] - def schedule(self, context, topic, method, *_args, **kwargs): + def schedule_create_volume(self, context, request_spec, filter_properties): """Picks a host that is up at random.""" + topic = FLAGS.volume_topic + host = self._schedule(context, topic, request_spec, + filter_properties=filter_properties) + volume_id = request_spec['volume_id'] + snapshot_id = request_spec['snapshot_id'] + image_id = request_spec['image_id'] - host = self._schedule(context, topic, None, **kwargs) - driver.cast_to_host(context, topic, host, method, **kwargs) + driver.cast_to_host(context, topic, host, 'create_volume', + volume_id=volume_id, + snapshot_id=snapshot_id, + image_id=image_id) diff --git a/cinder/scheduler/driver.py b/cinder/scheduler/driver.py index 498918752..a4e545c3f 100644 --- a/cinder/scheduler/driver.py +++ b/cinder/scheduler/driver.py @@ -107,3 +107,7 @@ class Scheduler(object): def schedule(self, context, topic, method, *_args, **_kwargs): """Must override schedule method for scheduler to work.""" raise NotImplementedError(_("Must implement a fallback schedule")) + + def schedule_create_volume(self, context, request_spec, filter_properties): + """Must override schedule method for scheduler to work.""" + raise NotImplementedError(_("Must implement schedule_create_volume")) diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index fae6f2004..e3b3a01c1 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -24,12 +24,14 @@ Scheduler Service import functools from cinder import db +from cinder import exception from cinder import flags -from cinder.openstack.common import log as logging from cinder import manager from cinder.openstack.common import cfg from cinder.openstack.common import excutils from cinder.openstack.common import importutils +from cinder.openstack.common import log as logging +from cinder.openstack.common.notifier import api as notifier LOG = logging.getLogger(__name__) @@ -45,7 +47,7 @@ FLAGS.register_opt(scheduler_driver_opt) class SchedulerManager(manager.Manager): """Chooses a host to create volumes""" - RPC_API_VERSION = '1.1' + RPC_API_VERSION = '1.2' def __init__(self, scheduler_driver=None, *args, **kwargs): if not scheduler_driver: @@ -53,13 +55,6 @@ class SchedulerManager(manager.Manager): self.driver = importutils.import_object(scheduler_driver) super(SchedulerManager, self).__init__(*args, **kwargs) - def __getattr__(self, key): - """Converts all method calls to use the schedule method""" - # NOTE(russellb) Because of what this is doing, we must be careful - # when changing the API of the scheduler drivers, as that changes - # the rpc API as well, and the version should be updated accordingly. - return functools.partial(self._schedule, key) - def get_host_list(self, context): """Get a list of hosts from the HostManager.""" return self.driver.get_host_list() @@ -76,23 +71,59 @@ class SchedulerManager(manager.Manager): self.driver.update_service_capabilities(service_name, host, capabilities) - def _schedule(self, method, context, topic, *args, **kwargs): - """Tries to call schedule_* method on the driver to retrieve host. - Falls back to schedule(context, topic) if method doesn't exist. - """ - driver_method_name = 'schedule_%s' % method + def create_volume(self, context, topic, volume_id, snapshot_id=None, + image_id=None, request_spec=None, + filter_properties=None): try: - driver_method = getattr(self.driver, driver_method_name) - args = (context,) + args - except AttributeError, e: - LOG.warning(_("Driver Method %(driver_method_name)s missing: " - "%(e)s. Reverting to schedule()") % locals()) - driver_method = self.driver.schedule - args = (context, topic, method) + args - - try: - return driver_method(*args, **kwargs) - except Exception: + if request_spec is None: + # For RPC version < 1.2 backward compatibility + request_spec = {} + volume_ref = db.volume_get(context, volume_id) + size = volume_ref.get('size') + availability_zone = volume_ref.get('availability_zone') + volume_type_id = volume_ref.get('volume_type_id') + vol_type = db.volume_type_get(context, volume_type_id) + volume_properties = {'size': size, + 'availability_zone': availability_zone, + 'volume_type_id': volume_type_id} + request_spec.update({'volume_id': volume_id, + 'snapshot_id': snapshot_id, + 'image_id': image_id, + 'volume_properties': volume_properties, + 'volume_type': dict(vol_type).iteritems()}) + + self.driver.schedule_create_volume(context, request_spec, + filter_properties) + except exception.NoValidHost as ex: + volume_state = {'volume_state': {'status': 'error'}} + self._set_volume_state_and_notify('create_volume', + volume_state, + context, ex, request_spec) + except Exception as ex: with excutils.save_and_reraise_exception(): - volume_id = kwargs.get('volume_id') - db.volume_update(context, volume_id, {'status': 'error'}) + volume_state = {'volume_state': {'status': 'error'}} + self._set_volume_state_and_notify('create_volume', + volume_state, + context, ex, request_spec) + + def _set_volume_state_and_notify(self, method, updates, context, ex, + request_spec): + LOG.warning(_("Failed to schedule_%(method)s: %(ex)s") % locals()) + + volume_state = updates['volume_state'] + properties = request_spec.get('volume_properties', {}) + + volume_id = request_spec.get('volume_id', None) + + if volume_id: + db.volume_update(context, volume_id, volume_state) + + payload = dict(request_spec=request_spec, + volume_properties=properties, + volume_id=volume_id, + state=volume_state, + method=method, + reason=ex) + + notifier.notify(context, notifier.publisher_id("scheduler"), + 'scheduler.' + method, notifier.ERROR, payload) diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index a253f0d29..596642337 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -32,6 +32,8 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy): 1.0 - Initial version. 1.1 - Add create_volume() method + 1.2 - Add request_spec, filter_properties arguments + to create_volume() ''' RPC_API_VERSION = '1.0' @@ -41,14 +43,16 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy): default_version=self.RPC_API_VERSION) def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, - image_id=None): + image_id=None, request_spec=None, + filter_properties=None): return self.cast(ctxt, self.make_msg('create_volume', - topic=topic, - volume_id=volume_id, - snapshot_id=snapshot_id, - image_id=image_id), - topic=None, - version='1.1') + topic=topic, + volume_id=volume_id, + snapshot_id=snapshot_id, + image_id=image_id, + request_spec=request_spec, + filter_properties=filter_properties), + version='1.2') def update_service_capabilities(self, ctxt, service_name, host, capabilities): diff --git a/cinder/scheduler/simple.py b/cinder/scheduler/simple.py index d7c832894..6ced96a51 100644 --- a/cinder/scheduler/simple.py +++ b/cinder/scheduler/simple.py @@ -43,22 +43,28 @@ FLAGS.register_opts(simple_scheduler_opts) class SimpleScheduler(chance.ChanceScheduler): """Implements Naive Scheduler that tries to find least loaded host.""" - def schedule_create_volume(self, context, volume_id, **_kwargs): + def schedule_create_volume(self, context, request_spec, filter_properties): """Picks a host that is up and has the fewest volumes.""" elevated = context.elevated() - volume_ref = db.volume_get(context, volume_id) - availability_zone = volume_ref.get('availability_zone') + volume_id = request_spec.get('volume_id') + snapshot_id = request_spec.get('snapshot_id') + image_id = request_spec.get('image_id') + volume_properties = request_spec.get('volume_properties') + volume_size = volume_properties.get('size') + availability_zone = volume_properties.get('availability_zone') zone, host = None, None if availability_zone: zone, _x, host = availability_zone.partition(':') if host and context.is_admin: - service = db.service_get_by_args(elevated, host, 'cinder-volume') + topic = FLAGS.volume_topic + service = db.service_get_by_args(elevated, host, topic) if not utils.service_is_up(service): raise exception.WillNotSchedule(host=host) driver.cast_to_volume_host(context, host, 'create_volume', - volume_id=volume_id, **_kwargs) + volume_id=volume_id, snapshot_id=snapshot_id, + image_id=image_id) return None results = db.service_get_all_volume_sorted(elevated) @@ -67,12 +73,13 @@ class SimpleScheduler(chance.ChanceScheduler): if service['availability_zone'] == zone] for result in results: (service, volume_gigabytes) = result - if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes: + if volume_gigabytes + volume_size > FLAGS.max_gigabytes: msg = _("Not enough allocatable volume gigabytes remaining") raise exception.NoValidHost(reason=msg) if utils.service_is_up(service) and not service['disabled']: driver.cast_to_volume_host(context, service['host'], - 'create_volume', volume_id=volume_id, **_kwargs) + 'create_volume', volume_id=volume_id, + snapshot_id=snapshot_id, image_id=image_id) return None msg = _("Is the appropriate service running?") raise exception.NoValidHost(reason=msg) diff --git a/cinder/tests/scheduler/test_rpcapi.py b/cinder/tests/scheduler/test_rpcapi.py index 7b61fa9ab..bf9e8abdf 100644 --- a/cinder/tests/scheduler/test_rpcapi.py +++ b/cinder/tests/scheduler/test_rpcapi.py @@ -69,8 +69,8 @@ class SchedulerRpcAPITestCase(test.TestCase): def test_create_volume(self): self._test_scheduler_api('create_volume', - rpc_method='cast', topic='fake_topic', - volume_id='fake_volume_id', - snapshot_id='fake_snapshot_id', - image_id='fake_image_id', - version='1.1') + rpc_method='cast', topic='topic', volume_id='volume_id', + snapshot_id='snapshot_id', image_id='image_id', + request_spec='fake_request_spec', + filter_properties='filter_properties', + version='1.2') diff --git a/cinder/tests/scheduler/test_scheduler.py b/cinder/tests/scheduler/test_scheduler.py index f6a74b58d..d0026a7c2 100644 --- a/cinder/tests/scheduler/test_scheduler.py +++ b/cinder/tests/scheduler/test_scheduler.py @@ -22,6 +22,7 @@ Tests For Scheduler from cinder import context from cinder import db +from cinder import exception from cinder import flags from cinder.openstack.common import rpc from cinder.openstack.common import timeutils @@ -57,28 +58,6 @@ class SchedulerManagerTestCase(test.TestCase): manager = self.manager self.assertTrue(isinstance(manager.driver, self.driver_cls)) - def test_get_host_list(self): - expected = 'fake_hosts' - - self.mox.StubOutWithMock(self.manager.driver, 'get_host_list') - self.manager.driver.get_host_list().AndReturn(expected) - - self.mox.ReplayAll() - result = self.manager.get_host_list(self.context) - self.assertEqual(result, expected) - - def test_get_service_capabilities(self): - expected = 'fake_service_capabs' - - self.mox.StubOutWithMock(self.manager.driver, - 'get_service_capabilities') - self.manager.driver.get_service_capabilities().AndReturn( - expected) - - self.mox.ReplayAll() - result = self.manager.get_service_capabilities(self.context) - self.assertEqual(result, expected) - def test_update_service_capabilities(self): service_name = 'fake_service' host = 'fake_host' @@ -104,28 +83,26 @@ class SchedulerManagerTestCase(test.TestCase): service_name=service_name, host=host, capabilities=capabilities) - def test_existing_method(self): - def stub_method(self, *args, **kwargs): - pass - setattr(self.manager.driver, 'schedule_stub_method', stub_method) - - self.mox.StubOutWithMock(self.manager.driver, - 'schedule_stub_method') - self.manager.driver.schedule_stub_method(self.context, - *self.fake_args, **self.fake_kwargs) + def test_create_volume_exception_puts_volume_in_error_state(self): + """ Test that a NoValideHost exception for create_volume puts + the volume in 'error' state and eats the exception. + """ + fake_volume_id = 1 + self._mox_schedule_method_helper('schedule_create_volume') + self.mox.StubOutWithMock(db, 'volume_update') - self.mox.ReplayAll() - self.manager.stub_method(self.context, self.topic, - *self.fake_args, **self.fake_kwargs) + topic = 'fake_topic' + volume_id = fake_volume_id + request_spec = {'volume_id': fake_volume_id} - def test_missing_method_fallback(self): - self.mox.StubOutWithMock(self.manager.driver, 'schedule') - self.manager.driver.schedule(self.context, self.topic, - 'noexist', *self.fake_args, **self.fake_kwargs) + self.manager.driver.schedule_create_volume(self.context, + request_spec, {}).AndRaise(exception.NoValidHost(reason="")) + db.volume_update(self.context, fake_volume_id, {'status': 'error'}) self.mox.ReplayAll() - self.manager.noexist(self.context, self.topic, - *self.fake_args, **self.fake_kwargs) + self.manager.create_volume(self.context, topic, volume_id, + request_spec=request_spec, + filter_properties={}) def _mox_schedule_method_helper(self, method_name): # Make sure the method exists that we're going to test call @@ -150,28 +127,6 @@ class SchedulerTestCase(test.TestCase): self.context = context.RequestContext('fake_user', 'fake_project') self.topic = 'fake_topic' - def test_get_host_list(self): - expected = 'fake_hosts' - - self.mox.StubOutWithMock(self.driver.host_manager, 'get_host_list') - self.driver.host_manager.get_host_list().AndReturn(expected) - - self.mox.ReplayAll() - result = self.driver.get_host_list() - self.assertEqual(result, expected) - - def test_get_service_capabilities(self): - expected = 'fake_service_capabs' - - self.mox.StubOutWithMock(self.driver.host_manager, - 'get_service_capabilities') - self.driver.host_manager.get_service_capabilities().AndReturn( - expected) - - self.mox.ReplayAll() - result = self.driver.get_service_capabilities() - self.assertEqual(result, expected) - def test_update_service_capabilities(self): service_name = 'fake_service' host = 'fake_host' diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 49400b73d..198e32588 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -178,17 +178,30 @@ class API(base.Base): finally: QUOTAS.rollback(context, reservations) - self._cast_create_volume(context, volume['id'], snapshot_id, - image_id) + request_spec = { + 'volume_properties': options, + 'volume_type': volume_type, + 'volume_id': volume['id'], + 'snapshot_id': volume['snapshot_id'], + 'image_id': image_id + } + + filter_properties = {} + + self._cast_create_volume(context, request_spec, filter_properties) + return volume - def _cast_create_volume(self, context, volume_id, snapshot_id, - image_id): + def _cast_create_volume(self, context, request_spec, filter_properties): # NOTE(Rongze Zhu): It is a simple solution for bug 1008866 # If snapshot_id is set, make the call create volume directly to # the volume host where the snapshot resides instead of passing it - # through the scheduer. So snapshot can be copy to new volume. + # through the scheduler. So snapshot can be copy to new volume. + + volume_id = request_spec['volume_id'] + snapshot_id = request_spec['snapshot_id'] + image_id = request_spec['image_id'] if snapshot_id and FLAGS.snapshot_same_host: snapshot_ref = self.db.snapshot_get(context, snapshot_id) @@ -208,8 +221,10 @@ class API(base.Base): self.scheduler_rpcapi.create_volume(context, FLAGS.volume_topic, volume_id, - snapshot_id=snapshot_id, - image_id=image_id) + snapshot_id, + image_id, + request_spec=request_spec, + filter_properties=filter_properties) @wrap_check_policy def delete(self, context, volume, force=False):