]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Scheduler API clean up and refactor
authorZhiteng Huang <zhiteng.huang@intel.com>
Sat, 6 Oct 2012 17:11:41 +0000 (01:11 +0800)
committerZhiteng Huang <zhiteng.huang@intel.com>
Wed, 7 Nov 2012 08:28:07 +0000 (16:28 +0800)
Unlike Nova scheduler, which has to consider serving compute and volume
scheduling, Cinder scheduler only serves volume scheduling, so there's no
need to keep generic interface 'schedule'. Instead, 'schedule_create_volume'
is added (if missing) to manager/driver class and chance/simple scheduler
driver implementation.

Also this patch changes the interface between API service and scheduler to
allow more information about volume is passed to scheduler for advanced
scheduling while maintained backward compatibility. And this change bumps
scheduler RPC API to version 1.2.

Change-Id: I42be05675cd73f89a03c84105ec512d7ee4f3c3a

cinder/scheduler/chance.py
cinder/scheduler/driver.py
cinder/scheduler/manager.py
cinder/scheduler/rpcapi.py
cinder/scheduler/simple.py
cinder/tests/scheduler/test_rpcapi.py
cinder/tests/scheduler/test_scheduler.py
cinder/volume/api.py

index 3cee1f0534945a3d8b91ac553b4a18d4ea9b1c74..e5af364f2765f0f79e321d21ac98c9923ed9f252 100644 (file)
@@ -24,9 +24,13 @@ Chance (Random) Scheduler implementation
 import random
 
 from cinder import exception
+from cinder import flags
 from cinder.scheduler import driver
 
 
+FLAGS = flags.FLAGS
+
+
 class ChanceScheduler(driver.Scheduler):
     """Implements Scheduler as a random node selector."""
 
@@ -54,8 +58,16 @@ class ChanceScheduler(driver.Scheduler):
 
         return hosts[int(random.random() * len(hosts))]
 
-    def schedule(self, context, topic, method, *_args, **kwargs):
+    def schedule_create_volume(self, context, request_spec, filter_properties):
         """Picks a host that is up at random."""
+        topic = FLAGS.volume_topic
+        host = self._schedule(context, topic, request_spec,
+                              filter_properties=filter_properties)
+        volume_id = request_spec['volume_id']
+        snapshot_id = request_spec['snapshot_id']
+        image_id = request_spec['image_id']
 
-        host = self._schedule(context, topic, None, **kwargs)
-        driver.cast_to_host(context, topic, host, method, **kwargs)
+        driver.cast_to_host(context, topic, host, 'create_volume',
+                            volume_id=volume_id,
+                            snapshot_id=snapshot_id,
+                            image_id=image_id)
index 498918752204ba38c122344b9eeee9869e6efd62..a4e545c3f524c1caba571e0c7c5753c92433fcda 100644 (file)
@@ -107,3 +107,7 @@ class Scheduler(object):
     def schedule(self, context, topic, method, *_args, **_kwargs):
         """Must override schedule method for scheduler to work."""
         raise NotImplementedError(_("Must implement a fallback schedule"))
+
+    def schedule_create_volume(self, context, request_spec, filter_properties):
+        """Must override schedule method for scheduler to work."""
+        raise NotImplementedError(_("Must implement schedule_create_volume"))
index fae6f2004a29a3f26550ac781322c19478d84d12..e3b3a01c1d580eadb6b1e691a25bfe145df923d7 100644 (file)
@@ -24,12 +24,14 @@ Scheduler Service
 import functools
 
 from cinder import db
+from cinder import exception
 from cinder import flags
-from cinder.openstack.common import log as logging
 from cinder import manager
 from cinder.openstack.common import cfg
 from cinder.openstack.common import excutils
 from cinder.openstack.common import importutils
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.notifier import api as notifier
 
 
 LOG = logging.getLogger(__name__)
@@ -45,7 +47,7 @@ FLAGS.register_opt(scheduler_driver_opt)
 class SchedulerManager(manager.Manager):
     """Chooses a host to create volumes"""
 
-    RPC_API_VERSION = '1.1'
+    RPC_API_VERSION = '1.2'
 
     def __init__(self, scheduler_driver=None, *args, **kwargs):
         if not scheduler_driver:
@@ -53,13 +55,6 @@ class SchedulerManager(manager.Manager):
         self.driver = importutils.import_object(scheduler_driver)
         super(SchedulerManager, self).__init__(*args, **kwargs)
 
-    def __getattr__(self, key):
-        """Converts all method calls to use the schedule method"""
-        # NOTE(russellb) Because of what this is doing, we must be careful
-        # when changing the API of the scheduler drivers, as that changes
-        # the rpc API as well, and the version should be updated accordingly.
-        return functools.partial(self._schedule, key)
-
     def get_host_list(self, context):
         """Get a list of hosts from the HostManager."""
         return self.driver.get_host_list()
@@ -76,23 +71,59 @@ class SchedulerManager(manager.Manager):
         self.driver.update_service_capabilities(service_name, host,
                 capabilities)
 
-    def _schedule(self, method, context, topic, *args, **kwargs):
-        """Tries to call schedule_* method on the driver to retrieve host.
-        Falls back to schedule(context, topic) if method doesn't exist.
-        """
-        driver_method_name = 'schedule_%s' % method
+    def create_volume(self, context, topic, volume_id, snapshot_id=None,
+                      image_id=None, request_spec=None,
+                      filter_properties=None):
         try:
-            driver_method = getattr(self.driver, driver_method_name)
-            args = (context,) + args
-        except AttributeError, e:
-            LOG.warning(_("Driver Method %(driver_method_name)s missing: "
-                       "%(e)s. Reverting to schedule()") % locals())
-            driver_method = self.driver.schedule
-            args = (context, topic, method) + args
-
-        try:
-            return driver_method(*args, **kwargs)
-        except Exception:
+            if request_spec is None:
+                # For RPC version < 1.2 backward compatibility
+                request_spec = {}
+                volume_ref = db.volume_get(context, volume_id)
+                size = volume_ref.get('size')
+                availability_zone = volume_ref.get('availability_zone')
+                volume_type_id = volume_ref.get('volume_type_id')
+                vol_type = db.volume_type_get(context, volume_type_id)
+                volume_properties = {'size': size,
+                                     'availability_zone': availability_zone,
+                                     'volume_type_id': volume_type_id}
+                request_spec.update({'volume_id': volume_id,
+                                 'snapshot_id': snapshot_id,
+                                 'image_id': image_id,
+                                 'volume_properties': volume_properties,
+                                 'volume_type': dict(vol_type).iteritems()})
+
+            self.driver.schedule_create_volume(context, request_spec,
+                                               filter_properties)
+        except exception.NoValidHost as ex:
+            volume_state = {'volume_state': {'status': 'error'}}
+            self._set_volume_state_and_notify('create_volume',
+                                              volume_state,
+                                              context, ex, request_spec)
+        except Exception as ex:
             with excutils.save_and_reraise_exception():
-                volume_id = kwargs.get('volume_id')
-                db.volume_update(context, volume_id, {'status': 'error'})
+                volume_state = {'volume_state': {'status': 'error'}}
+                self._set_volume_state_and_notify('create_volume',
+                                                  volume_state,
+                                                  context, ex, request_spec)
+
+    def _set_volume_state_and_notify(self, method, updates, context, ex,
+                                     request_spec):
+        LOG.warning(_("Failed to schedule_%(method)s: %(ex)s") % locals())
+
+        volume_state = updates['volume_state']
+        properties = request_spec.get('volume_properties', {})
+
+        volume_id = request_spec.get('volume_id', None)
+
+        if volume_id:
+            db.volume_update(context, volume_id, volume_state)
+
+        payload = dict(request_spec=request_spec,
+                       volume_properties=properties,
+                       volume_id=volume_id,
+                       state=volume_state,
+                       method=method,
+                       reason=ex)
+
+        notifier.notify(context, notifier.publisher_id("scheduler"),
+                        'scheduler.' + method, notifier.ERROR, payload)
index a253f0d297c979c43384aebec6a7aaf04322e8d1..5966423373b430232321328472629b197a346da9 100644 (file)
@@ -32,6 +32,8 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
 
         1.0 - Initial version.
         1.1 - Add create_volume() method
+        1.2 - Add request_spec, filter_properties arguments
+              to create_volume()
     '''
 
     RPC_API_VERSION = '1.0'
@@ -41,14 +43,16 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
                 default_version=self.RPC_API_VERSION)
 
     def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
-                      image_id=None):
+                      image_id=None, request_spec=None,
+                      filter_properties=None):
         return self.cast(ctxt, self.make_msg('create_volume',
-                                    topic=topic,
-                                    volume_id=volume_id,
-                                    snapshot_id=snapshot_id,
-                                    image_id=image_id),
-                         topic=None,
-                         version='1.1')
+                                         topic=topic,
+                                         volume_id=volume_id,
+                                         snapshot_id=snapshot_id,
+                                         image_id=image_id,
+                                         request_spec=request_spec,
+                                         filter_properties=filter_properties),
+                         version='1.2')
 
     def update_service_capabilities(self, ctxt, service_name, host,
             capabilities):
index d7c8328944d10c4e42edf826acca4c1cda5084e3..6ced96a51401abc2e3209f321f8f2a14fbe9a8c2 100644 (file)
@@ -43,22 +43,28 @@ FLAGS.register_opts(simple_scheduler_opts)
 class SimpleScheduler(chance.ChanceScheduler):
     """Implements Naive Scheduler that tries to find least loaded host."""
 
-    def schedule_create_volume(self, context, volume_id, **_kwargs):
+    def schedule_create_volume(self, context, request_spec, filter_properties):
         """Picks a host that is up and has the fewest volumes."""
         elevated = context.elevated()
 
-        volume_ref = db.volume_get(context, volume_id)
-        availability_zone = volume_ref.get('availability_zone')
+        volume_id = request_spec.get('volume_id')
+        snapshot_id = request_spec.get('snapshot_id')
+        image_id = request_spec.get('image_id')
+        volume_properties = request_spec.get('volume_properties')
+        volume_size = volume_properties.get('size')
+        availability_zone = volume_properties.get('availability_zone')
 
         zone, host = None, None
         if availability_zone:
             zone, _x, host = availability_zone.partition(':')
         if host and context.is_admin:
-            service = db.service_get_by_args(elevated, host, 'cinder-volume')
+            topic = FLAGS.volume_topic
+            service = db.service_get_by_args(elevated, host, topic)
             if not utils.service_is_up(service):
                 raise exception.WillNotSchedule(host=host)
             driver.cast_to_volume_host(context, host, 'create_volume',
-                    volume_id=volume_id, **_kwargs)
+                    volume_id=volume_id, snapshot_id=snapshot_id,
+                    image_id=image_id)
             return None
 
         results = db.service_get_all_volume_sorted(elevated)
@@ -67,12 +73,13 @@ class SimpleScheduler(chance.ChanceScheduler):
                        if service['availability_zone'] == zone]
         for result in results:
             (service, volume_gigabytes) = result
-            if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes:
+            if volume_gigabytes + volume_size > FLAGS.max_gigabytes:
                 msg = _("Not enough allocatable volume gigabytes remaining")
                 raise exception.NoValidHost(reason=msg)
             if utils.service_is_up(service) and not service['disabled']:
                 driver.cast_to_volume_host(context, service['host'],
-                        'create_volume', volume_id=volume_id, **_kwargs)
+                        'create_volume', volume_id=volume_id,
+                        snapshot_id=snapshot_id, image_id=image_id)
                 return None
         msg = _("Is the appropriate service running?")
         raise exception.NoValidHost(reason=msg)
index 7b61fa9ab0214413e89032462ffd78b76c9d08a1..bf9e8abdfd134b72c7100f8e55fb28462371767b 100644 (file)
@@ -69,8 +69,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
 
     def test_create_volume(self):
         self._test_scheduler_api('create_volume',
-                rpc_method='cast', topic='fake_topic',
-                volume_id='fake_volume_id',
-                snapshot_id='fake_snapshot_id',
-                image_id='fake_image_id',
-                version='1.1')
+                rpc_method='cast', topic='topic', volume_id='volume_id',
+                snapshot_id='snapshot_id', image_id='image_id',
+                request_spec='fake_request_spec',
+                filter_properties='filter_properties',
+                version='1.2')
index f6a74b58d329ae0370b707e4fd655dbd24b50b9b..d0026a7c275a447be2213cadff7b4134e680754b 100644 (file)
@@ -22,6 +22,7 @@ Tests For Scheduler
 
 from cinder import context
 from cinder import db
+from cinder import exception
 from cinder import flags
 from cinder.openstack.common import rpc
 from cinder.openstack.common import timeutils
@@ -57,28 +58,6 @@ class SchedulerManagerTestCase(test.TestCase):
         manager = self.manager
         self.assertTrue(isinstance(manager.driver, self.driver_cls))
 
-    def test_get_host_list(self):
-        expected = 'fake_hosts'
-
-        self.mox.StubOutWithMock(self.manager.driver, 'get_host_list')
-        self.manager.driver.get_host_list().AndReturn(expected)
-
-        self.mox.ReplayAll()
-        result = self.manager.get_host_list(self.context)
-        self.assertEqual(result, expected)
-
-    def test_get_service_capabilities(self):
-        expected = 'fake_service_capabs'
-
-        self.mox.StubOutWithMock(self.manager.driver,
-                'get_service_capabilities')
-        self.manager.driver.get_service_capabilities().AndReturn(
-                expected)
-
-        self.mox.ReplayAll()
-        result = self.manager.get_service_capabilities(self.context)
-        self.assertEqual(result, expected)
-
     def test_update_service_capabilities(self):
         service_name = 'fake_service'
         host = 'fake_host'
@@ -104,28 +83,26 @@ class SchedulerManagerTestCase(test.TestCase):
                 service_name=service_name, host=host,
                 capabilities=capabilities)
 
-    def test_existing_method(self):
-        def stub_method(self, *args, **kwargs):
-            pass
-        setattr(self.manager.driver, 'schedule_stub_method', stub_method)
-
-        self.mox.StubOutWithMock(self.manager.driver,
-                'schedule_stub_method')
-        self.manager.driver.schedule_stub_method(self.context,
-                *self.fake_args, **self.fake_kwargs)
+    def test_create_volume_exception_puts_volume_in_error_state(self):
+        """ Test that a NoValideHost exception for create_volume puts
+        the volume in 'error' state and eats the exception.
+        """
+        fake_volume_id = 1
+        self._mox_schedule_method_helper('schedule_create_volume')
+        self.mox.StubOutWithMock(db, 'volume_update')
 
-        self.mox.ReplayAll()
-        self.manager.stub_method(self.context, self.topic,
-                *self.fake_args, **self.fake_kwargs)
+        topic = 'fake_topic'
+        volume_id = fake_volume_id
+        request_spec = {'volume_id': fake_volume_id}
 
-    def test_missing_method_fallback(self):
-        self.mox.StubOutWithMock(self.manager.driver, 'schedule')
-        self.manager.driver.schedule(self.context, self.topic,
-                'noexist', *self.fake_args, **self.fake_kwargs)
+        self.manager.driver.schedule_create_volume(self.context,
+            request_spec, {}).AndRaise(exception.NoValidHost(reason=""))
+        db.volume_update(self.context, fake_volume_id, {'status': 'error'})
 
         self.mox.ReplayAll()
-        self.manager.noexist(self.context, self.topic,
-                *self.fake_args, **self.fake_kwargs)
+        self.manager.create_volume(self.context, topic, volume_id,
+                                   request_spec=request_spec,
+                                   filter_properties={})
 
     def _mox_schedule_method_helper(self, method_name):
         # Make sure the method exists that we're going to test call
@@ -150,28 +127,6 @@ class SchedulerTestCase(test.TestCase):
         self.context = context.RequestContext('fake_user', 'fake_project')
         self.topic = 'fake_topic'
 
-    def test_get_host_list(self):
-        expected = 'fake_hosts'
-
-        self.mox.StubOutWithMock(self.driver.host_manager, 'get_host_list')
-        self.driver.host_manager.get_host_list().AndReturn(expected)
-
-        self.mox.ReplayAll()
-        result = self.driver.get_host_list()
-        self.assertEqual(result, expected)
-
-    def test_get_service_capabilities(self):
-        expected = 'fake_service_capabs'
-
-        self.mox.StubOutWithMock(self.driver.host_manager,
-                'get_service_capabilities')
-        self.driver.host_manager.get_service_capabilities().AndReturn(
-                expected)
-
-        self.mox.ReplayAll()
-        result = self.driver.get_service_capabilities()
-        self.assertEqual(result, expected)
-
     def test_update_service_capabilities(self):
         service_name = 'fake_service'
         host = 'fake_host'
index 49400b73d3c88d68f1d1ee5093540d0ae3fe354a..198e32588b2932f87def459d02d487c0ccf529b1 100644 (file)
@@ -178,17 +178,30 @@ class API(base.Base):
                 finally:
                     QUOTAS.rollback(context, reservations)
 
-        self._cast_create_volume(context, volume['id'], snapshot_id,
-                                 image_id)
+        request_spec = {
+            'volume_properties': options,
+            'volume_type': volume_type,
+            'volume_id': volume['id'],
+            'snapshot_id': volume['snapshot_id'],
+            'image_id': image_id
+        }
+
+        filter_properties = {}
+
+        self._cast_create_volume(context, request_spec, filter_properties)
+
         return volume
 
-    def _cast_create_volume(self, context, volume_id, snapshot_id,
-                            image_id):
+    def _cast_create_volume(self, context, request_spec, filter_properties):
 
         # NOTE(Rongze Zhu): It is a simple solution for bug 1008866
         # If snapshot_id is set, make the call create volume directly to
         # the volume host where the snapshot resides instead of passing it
-        # through the scheduer. So snapshot can be copy to new volume.
+        # through the scheduler. So snapshot can be copy to new volume.
+
+        volume_id = request_spec['volume_id']
+        snapshot_id = request_spec['snapshot_id']
+        image_id = request_spec['image_id']
 
         if snapshot_id and FLAGS.snapshot_same_host:
             snapshot_ref = self.db.snapshot_get(context, snapshot_id)
@@ -208,8 +221,10 @@ class API(base.Base):
             self.scheduler_rpcapi.create_volume(context,
                 FLAGS.volume_topic,
                 volume_id,
-                snapshot_id=snapshot_id,
-                image_id=image_id)
+                snapshot_id,
+                image_id,
+                request_spec=request_spec,
+                filter_properties=filter_properties)
 
     @wrap_check_policy
     def delete(self, context, volume, force=False):