]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Allow create_volume() to retry when exception happened
authorZhiteng Huang <zhiteng.huang@intel.com>
Fri, 25 Jan 2013 18:18:03 +0000 (02:18 +0800)
committerZhiteng Huang <zhiteng.huang@intel.com>
Thu, 14 Feb 2013 15:36:58 +0000 (23:36 +0800)
Due to the fact that certain volume back-ends cannot easily report
simple total_capacity_gb/free_capacity_gb for their internal
implementation complexity, scheduler is updated to let those back-ends
who report unclear capacity pass capacity filter, thus there is chance
create_volume() request would fail. In a more general case, when a
volume back-end failed to serve create_volume request for whatever reason
it'd be good that we have a mechanism to 'retry' the request.

So the idea is when volume manager catches the exception from
driver.create_volume() call, it checks if the request is allowed to be
rescheduled (requests that are not: clone volume and create volume from
snapshot while 'snapshot_same_host' option is true), it composes a new
request back to scheduler with additional information to mark this specific
back-end has been tried (so that scheduler may choose to skip this back-end
if needed).  Scheduler is (filter scheduler only, simple and chance
scheduler is not supported) is updated as well so that it only retry
scheduler_max_attempts times.  In order to skip/rule out previously tried
back-ends in next schedule task, a new RetryFilter is added.

Changes:
1) volume RPC API create_volume() is updated with new parameters
to save original request information in case rescheduling is needed.
This bumps volume RPC API to 1.4.

2) implementation of create_volume() method in volume API is
refactored in order to distinguish if a request is allowed to
do reschedule (i.e. currently create volume from source volume
bypasses scheduler, not rescheduling is allowed).

3) add reschedule functionality in create_volume() of volume
manager so that it's able to send the request back to scheduler.

4) add schedule_max_attempts config option in scheduler/driver.py

5) add RetryFitler

6) change scheduler_driver default option to FilterScheduler

Change-Id: Ia46b5eb4dc033d73734b6aea82ada34ba5731075

12 files changed:
cinder/scheduler/driver.py
cinder/scheduler/filter_scheduler.py
cinder/scheduler/filters/retry_filter.py [new file with mode: 0644]
cinder/scheduler/manager.py
cinder/tests/scheduler/test_filter_scheduler.py
cinder/tests/scheduler/test_host_filters.py
cinder/tests/test_volume.py
cinder/tests/test_volume_rpcapi.py
cinder/volume/api.py
cinder/volume/manager.py
cinder/volume/rpcapi.py
setup.py

index f7d72983fef48a54dbd5b2bd433ca26b262439d6..a5329ff0e65013fcd5436fc0e65062885a4cda00 100644 (file)
@@ -33,7 +33,11 @@ from cinder.volume import rpcapi as volume_rpcapi
 scheduler_driver_opts = [
     cfg.StrOpt('scheduler_host_manager',
                default='cinder.scheduler.host_manager.HostManager',
-               help='The scheduler host manager class to use'), ]
+               help='The scheduler host manager class to use'),
+    cfg.IntOpt('scheduler_max_attempts',
+               default=3,
+               help='Maximum number of attempts to schedule an volume'),
+]
 
 FLAGS = flags.FLAGS
 FLAGS.register_opts(scheduler_driver_opts)
index ea8dc4ed7a564fe86c080642b778c94ecc74a223..24feebb4c2877343622419a2d9fcf9fb04cb0b40 100644 (file)
@@ -40,6 +40,7 @@ class FilterScheduler(driver.Scheduler):
         super(FilterScheduler, self).__init__(*args, **kwargs)
         self.cost_function_cache = None
         self.options = scheduler_options.SchedulerOptions()
+        self.max_attempts = self._max_attempts()
 
     def schedule(self, context, topic, method, *args, **kwargs):
         """The schedule() contract requires we return the one
@@ -74,8 +75,91 @@ class FilterScheduler(driver.Scheduler):
         image_id = request_spec['image_id']
 
         updated_volume = driver.volume_update_db(context, volume_id, host)
+        self._post_select_populate_filter_properties(filter_properties,
+                                                     weighed_host.obj)
+
+        # context is not serializable
+        filter_properties.pop('context', None)
+
         self.volume_rpcapi.create_volume(context, updated_volume, host,
-                                         snapshot_id, image_id)
+                                         request_spec=request_spec,
+                                         filter_properties=filter_properties,
+                                         allow_reschedule=True,
+                                         snapshot_id=snapshot_id,
+                                         image_id=image_id)
+
+    def _post_select_populate_filter_properties(self, filter_properties,
+                                                host_state):
+        """Add additional information to the filter properties after a host has
+        been selected by the scheduling process.
+        """
+        # Add a retry entry for the selected volume backend:
+        self._add_retry_host(filter_properties, host_state.host)
+
+    def _add_retry_host(self, filter_properties, host):
+        """Add a retry entry for the selected volume backend. In the event that
+        the request gets re-scheduled, this entry will signal that the given
+        backend has already been tried.
+        """
+        retry = filter_properties.get('retry', None)
+        if not retry:
+            return
+        hosts = retry['hosts']
+        hosts.append(host)
+
+    def _max_attempts(self):
+        max_attempts = FLAGS.scheduler_max_attempts
+        if max_attempts < 1:
+            msg = _("Invalid value for 'scheduler_max_attempts', "
+                    "must be >=1")
+            raise exception.InvalidParameterValue(err=msg)
+        return max_attempts
+
+    def _log_volume_error(self, volume_id, retry):
+        """If the request contained an exception from a previous volume
+        create operation, log it to aid debugging
+        """
+        exc = retry.pop('exc', None)  # string-ified exception from volume
+        if not exc:
+            return  # no exception info from a previous attempt, skip
+
+        hosts = retry.get('hosts', None)
+        if not hosts:
+            return  # no previously attempted hosts, skip
+
+        last_host = hosts[-1]
+        msg = _("Error from last vol-service: %(last_host)s : "
+                "%(exc)s") % locals()
+        LOG.error(msg, volume_id=volume_id)
+
+    def _populate_retry(self, filter_properties, properties):
+        """Populate filter properties with history of retries for this
+        request. If maximum retries is exceeded, raise NoValidHost.
+        """
+        max_attempts = self.max_attempts
+        retry = filter_properties.pop('retry', {})
+
+        if max_attempts == 1:
+            # re-scheduling is disabled.
+            return
+
+        # retry is enabled, update attempt count:
+        if retry:
+            retry['num_attempts'] += 1
+        else:
+            retry = {
+                'num_attempts': 1,
+                'hosts': []  # list of volume service hosts tried
+            }
+        filter_properties['retry'] = retry
+
+        volume_id = properties.get('volume_id')
+        self._log_volume_error(volume_id, retry)
+
+        if retry['num_attempts'] > max_attempts:
+            msg = _("Exceeded max scheduling attempts %(max_attempts)d for "
+                    "volume %(volume_id)s") % locals()
+            raise exception.NoValidHost(reason=msg)
 
     def _schedule(self, context, request_spec, filter_properties=None):
         """Returns a list of hosts that meet the required specs,
@@ -84,9 +168,9 @@ class FilterScheduler(driver.Scheduler):
         elevated = context.elevated()
 
         volume_properties = request_spec['volume_properties']
-        # Since Nova is using mixed filters from Oslo and it's own, which
-        # takes 'resource_XX' and 'instance_XX' as input respectively, copying
-        # 'instance_XX' to 'resource_XX' will make both filters happy.
+        # Since Cinder is using mixed filters from Oslo and it's own, which
+        # takes 'resource_XX' and 'volume_XX' as input respectively, copying
+        # 'volume_XX' to 'resource_XX' will make both filters happy.
         resource_properties = volume_properties.copy()
         volume_type = request_spec.get("volume_type", None)
         resource_type = request_spec.get("volume_type", None)
@@ -96,6 +180,8 @@ class FilterScheduler(driver.Scheduler):
 
         if filter_properties is None:
             filter_properties = {}
+        self._populate_retry(filter_properties, resource_properties)
+
         filter_properties.update({'context': context,
                                   'request_spec': request_spec,
                                   'config_options': config_options,
diff --git a/cinder/scheduler/filters/retry_filter.py b/cinder/scheduler/filters/retry_filter.py
new file mode 100644 (file)
index 0000000..ae84a4e
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.scheduler import filters
+
+LOG = logging.getLogger(__name__)
+
+
+class RetryFilter(filters.BaseHostFilter):
+    """Filter out nodes that have already been attempted for scheduling
+    purposes
+    """
+
+    def host_passes(self, host_state, filter_properties):
+        """Skip nodes that have already been attempted."""
+        retry = filter_properties.get('retry', None)
+        if not retry:
+            # Re-scheduling is disabled
+            LOG.debug("Re-scheduling is disabled")
+            return True
+
+        hosts = retry.get('hosts', [])
+        host = host_state.host
+
+        passes = host not in hosts
+        pass_msg = "passes" if passes else "fails"
+
+        LOG.debug(_("Host %(host)s %(pass_msg)s.  Previously tried hosts: "
+                    "%(hosts)s") % locals())
+
+        # Host passes if it's not in the list of previously attempted hosts:
+        return passes
index f3f170dc672ff0fa1c60a53070e34835b3082960..92247a89c643a5c39c0e5da745fe96c21b317b6e 100644 (file)
@@ -37,8 +37,8 @@ from cinder.volume import rpcapi as volume_rpcapi
 LOG = logging.getLogger(__name__)
 
 scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
-                                  default='cinder.scheduler.simple.'
-                                          'SimpleScheduler',
+                                  default='cinder.scheduler.filter_scheduler.'
+                                          'FilterScheduler',
                                   help='Default scheduler driver to use')
 
 FLAGS = flags.FLAGS
index 064c2746183a3fa159a64e930a0d52ff3d84fe19..6579d6cb9dfcf40ac9e8b017ffc35d6438f59142 100644 (file)
@@ -22,6 +22,7 @@ from cinder import test
 
 from cinder.openstack.common.scheduler import weights
 from cinder.scheduler import filter_scheduler
+from cinder.scheduler import host_manager
 from cinder.tests.scheduler import fakes
 from cinder.tests.scheduler import test_scheduler
 from cinder.tests import utils as test_utils
@@ -53,7 +54,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
                         'volume_type': {'name': 'LVM_iSCSI'},
                         'volume_id': ['fake-id1']}
         self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
-                          fake_context, request_spec, None)
+                          fake_context, request_spec, {})
 
     @test.skip_if(not test_utils.is_cinder_installed(),
                   'Test requires Cinder installed (try setup.py develop')
@@ -78,7 +79,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
                         'volume_type': {'name': 'LVM_iSCSI'},
                         'volume_id': ['fake-id1']}
         self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
-                          fake_context, request_spec, None)
+                          fake_context, request_spec, {})
         self.assertTrue(self.was_admin)
 
     @test.skip_if(not test_utils.is_cinder_installed(),
@@ -110,3 +111,108 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
         self.mox.ReplayAll()
         weighed_host = sched._schedule(fake_context, request_spec, {})
         self.assertTrue(weighed_host.obj is not None)
+
+    def test_max_attempts(self):
+        self.flags(scheduler_max_attempts=4)
+
+        sched = fakes.FakeFilterScheduler()
+        self.assertEqual(4, sched._max_attempts())
+
+    def test_invalid_max_attempts(self):
+        self.flags(scheduler_max_attempts=0)
+
+        self.assertRaises(exception.InvalidParameterValue,
+                          fakes.FakeFilterScheduler)
+
+    def test_retry_disabled(self):
+        # Retry info should not get populated when re-scheduling is off.
+        self.flags(scheduler_max_attempts=1)
+        sched = fakes.FakeFilterScheduler()
+
+        request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_properties': {'project_id': 1,
+                                              'size': 1}}
+        filter_properties = {}
+
+        sched._schedule(self.context, request_spec,
+                        filter_properties=filter_properties)
+
+        # should not have retry info in the populated filter properties:
+        self.assertFalse("retry" in filter_properties)
+
+    def test_retry_attempt_one(self):
+        # Test retry logic on initial scheduling attempt.
+        self.flags(scheduler_max_attempts=2)
+        sched = fakes.FakeFilterScheduler()
+
+        request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_properties': {'project_id': 1,
+                                              'size': 1}}
+        filter_properties = {}
+
+        sched._schedule(self.context, request_spec,
+                        filter_properties=filter_properties)
+
+        num_attempts = filter_properties['retry']['num_attempts']
+        self.assertEqual(1, num_attempts)
+
+    def test_retry_attempt_two(self):
+        # Test retry logic when re-scheduling.
+        self.flags(scheduler_max_attempts=2)
+        sched = fakes.FakeFilterScheduler()
+
+        request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_properties': {'project_id': 1,
+                                              'size': 1}}
+
+        retry = dict(num_attempts=1)
+        filter_properties = dict(retry=retry)
+
+        sched._schedule(self.context, request_spec,
+                        filter_properties=filter_properties)
+
+        num_attempts = filter_properties['retry']['num_attempts']
+        self.assertEqual(2, num_attempts)
+
+    def test_retry_exceeded_max_attempts(self):
+        # Test for necessary explosion when max retries is exceeded.
+        self.flags(scheduler_max_attempts=2)
+        sched = fakes.FakeFilterScheduler()
+
+        request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_properties': {'project_id': 1,
+                                              'size': 1}}
+
+        retry = dict(num_attempts=2)
+        filter_properties = dict(retry=retry)
+
+        self.assertRaises(exception.NoValidHost, sched._schedule, self.context,
+                          request_spec, filter_properties=filter_properties)
+
+    def test_add_retry_host(self):
+        retry = dict(num_attempts=1, hosts=[])
+        filter_properties = dict(retry=retry)
+        host = "fakehost"
+
+        sched = fakes.FakeFilterScheduler()
+        sched._add_retry_host(filter_properties, host)
+
+        hosts = filter_properties['retry']['hosts']
+        self.assertEqual(1, len(hosts))
+        self.assertEqual(host, hosts[0])
+
+    def test_post_select_populate(self):
+        # Test addition of certain filter props after a node is selected.
+        retry = {'hosts': [], 'num_attempts': 1}
+        filter_properties = {'retry': retry}
+        sched = fakes.FakeFilterScheduler()
+
+        host_state = host_manager.HostState('host')
+        host_state.total_capacity_gb = 1024
+        sched._post_select_populate_filter_properties(filter_properties,
+                                                      host_state)
+
+        self.assertEqual('host',
+                         filter_properties['retry']['hosts'][0])
+
+        self.assertEqual(1024, host_state.total_capacity_gb)
index a6fb856cb042bcae66ccea0df0bef08024e3c105..810056fd1a9eceeddd5feaf06eeeccd7944f6918 100644 (file)
@@ -128,3 +128,32 @@ class HostFiltersTestCase(test.TestCase):
                                     'updated_at': None,
                                     'service': service})
         self.assertTrue(filt_cls.host_passes(host, filter_properties))
+
+    @test.skip_if(not test_utils.is_cinder_installed(),
+                  'Test requires Cinder installed')
+    def test_retry_filter_disabled(self):
+        # Test case where retry/re-scheduling is disabled.
+        filt_cls = self.class_map['RetryFilter']()
+        host = fakes.FakeHostState('host1', {})
+        filter_properties = {}
+        self.assertTrue(filt_cls.host_passes(host, filter_properties))
+
+    @test.skip_if(not test_utils.is_cinder_installed(),
+                  'Test requires Cinder installed')
+    def test_retry_filter_pass(self):
+        # Node not previously tried.
+        filt_cls = self.class_map['RetryFilter']()
+        host = fakes.FakeHostState('host1', {})
+        retry = dict(num_attempts=2, hosts=['host2'])
+        filter_properties = dict(retry=retry)
+        self.assertTrue(filt_cls.host_passes(host, filter_properties))
+
+    @test.skip_if(not test_utils.is_cinder_installed(),
+                  'Test requires Cinder installed')
+    def test_retry_filter_fail(self):
+        # Node was already tried.
+        filt_cls = self.class_map['RetryFilter']()
+        host = fakes.FakeHostState('host1', {})
+        retry = dict(num_attempts=1, hosts=['host1'])
+        filter_properties = dict(retry=retry)
+        self.assertFalse(filt_cls.host_passes(host, filter_properties))
index 6170da2e9c647ada539883c2e2c9ae5a1f8bfe3e..1b661f340b866cd524a07d8e337dbf75bdc13fb3 100644 (file)
@@ -508,7 +508,8 @@ class VolumeTestCase(test.TestCase):
         def fake_local_path(volume):
             return dst_path
 
-        def fake_copy_image_to_volume(context, volume, image_id):
+        def fake_copy_image_to_volume(context, volume,
+                                      image_service, image_id):
             pass
 
         def fake_fetch_to_raw(context, image_service, image_id, vol_path):
@@ -545,11 +546,6 @@ class VolumeTestCase(test.TestCase):
             db.volume_destroy(self.context, volume_id)
             os.unlink(dst_path)
 
-    def test_create_volume_from_image_status_downloading(self):
-        """Verify that before copying image to volume, it is in downloading
-        state."""
-        self._create_volume_from_image('downloading', True)
-
     def test_create_volume_from_image_status_available(self):
         """Verify that before copying image to volume, it is in available
         state."""
@@ -577,7 +573,7 @@ class VolumeTestCase(test.TestCase):
         self.assertRaises(exception.ImageNotFound,
                           self.volume.create_volume,
                           self.context,
-                          volume_id,
+                          volume_id, None, None, None,
                           None,
                           image_id)
         volume = db.volume_get(self.context, volume_id)
index 5fdc8092afb31c2ca7312310ff2fcd616648b6b7..7a75224b8191136a59641beb896d171a5ed27a37 100644 (file)
@@ -112,10 +112,13 @@ class VolumeRpcAPITestCase(test.TestCase):
                               rpc_method='cast',
                               volume=self.fake_volume,
                               host='fake_host1',
+                              request_spec='fake_request_spec',
+                              filter_properties='fake_properties',
+                              allow_reschedule=True,
                               snapshot_id='fake_snapshot_id',
                               image_id='fake_image_id',
                               source_volid='fake_src_id',
-                              version='1.1')
+                              version='1.4')
 
     def test_delete_volume(self):
         self._test_volume_api('delete_volume',
index eadced454830bacd7ec87e45963d8aa46b933462..ffd9d8bc1426f387df4ab0591ddf89888bccb522 100644 (file)
@@ -242,8 +242,12 @@ class API(base.Base):
             self.volume_rpcapi.create_volume(context,
                                              volume_ref,
                                              volume_ref['host'],
-                                             snapshot_id,
-                                             image_id)
+                                             request_spec=request_spec,
+                                             filter_properties=
+                                             filter_properties,
+                                             allow_reschedule=False,
+                                             snapshot_id=snapshot_id,
+                                             image_id=image_id)
         elif source_volid:
             source_volume_ref = self.db.volume_get(context,
                                                    source_volid)
@@ -255,18 +259,22 @@ class API(base.Base):
             self.volume_rpcapi.create_volume(context,
                                              volume_ref,
                                              volume_ref['host'],
-                                             snapshot_id,
-                                             image_id,
-                                             source_volid)
+                                             request_spec=request_spec,
+                                             filter_properties=
+                                             filter_properties,
+                                             allow_reschedule=False,
+                                             snapshot_id=snapshot_id,
+                                             image_id=image_id,
+                                             source_volid=source_volid)
         else:
-            self.scheduler_rpcapi.create_volume(
-                context,
-                FLAGS.volume_topic,
-                volume_id,
-                snapshot_id,
-                image_id,
-                request_spec=request_spec,
-                filter_properties=filter_properties)
+            self.scheduler_rpcapi.create_volume(context,
+                                                FLAGS.volume_topic,
+                                                volume_id,
+                                                snapshot_id,
+                                                image_id,
+                                                request_spec=request_spec,
+                                                filter_properties=
+                                                filter_properties)
 
     @wrap_check_policy
     def delete(self, context, volume, force=False):
index 2b88d9a6168955e7b286756b764e57bae03d7b85..f194f624c1db672899395f68f3f06411e5b6fdc5 100644 (file)
@@ -37,6 +37,10 @@ intact.
 
 """
 
+
+import sys
+import traceback
+
 from cinder import context
 from cinder import exception
 from cinder import flags
@@ -103,7 +107,7 @@ MAPPING = {
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
 
-    RPC_API_VERSION = '1.3'
+    RPC_API_VERSION = '1.4'
 
     def __init__(self, volume_driver=None, *args, **kwargs):
         """Load the driver from the one specified in args, or from flags."""
@@ -146,10 +150,44 @@ class VolumeManager(manager.SchedulerDependentManager):
         # collect and publish service capabilities
         self.publish_service_capabilities(ctxt)
 
-    def create_volume(self, context, volume_id, snapshot_id=None,
-                      image_id=None, source_volid=None):
+    def _create_volume(self, context, volume_ref, snapshot_ref,
+                       srcvol_ref, image_service, image_id, image_location):
+        cloned = None
+        model_update = False
+
+        if all(x is None for x in(snapshot_ref, image_location, srcvol_ref)):
+            model_update = self.driver.create_volume(volume_ref)
+        elif snapshot_ref is not None:
+            model_update = self.driver.create_volume_from_snapshot(
+                volume_ref,
+                snapshot_ref)
+        elif srcvol_ref is not None:
+            model_update = self.driver.create_cloned_volume(volume_ref,
+                                                            srcvol_ref)
+        else:
+            # create the volume from an image
+            cloned = self.driver.clone_image(volume_ref, image_location)
+            if not cloned:
+                model_update = self.driver.create_volume(volume_ref)
+                #copy the image onto the volume.
+                status = 'downloading'
+                self.db.volume_update(context,
+                                      volume_ref['id'],
+                                      {'status': status})
+                self._copy_image_to_volume(context,
+                                           volume_ref,
+                                           image_service,
+                                           image_id)
+
+        return model_update, cloned
+
+    def create_volume(self, context, volume_id, request_spec=None,
+                      filter_properties=None, allow_reschedule=True,
+                      snapshot_id=None, image_id=None, source_volid=None):
         """Creates and exports the volume."""
         context = context.elevated()
+        if filter_properties is None:
+            filter_properties = {}
         volume_ref = self.db.volume_get(context, volume_id)
         self._notify_about_volume_usage(context, volume_ref, "create.start")
         LOG.info(_("volume %s: creating"), volume_ref['name'])
@@ -167,36 +205,52 @@ class VolumeManager(manager.SchedulerDependentManager):
             vol_size = volume_ref['size']
             LOG.debug(_("volume %(vol_name)s: creating lv of"
                         " size %(vol_size)sG") % locals())
-            if all(x is None for x in(snapshot_id, image_id, source_volid)):
-                model_update = self.driver.create_volume(volume_ref)
-            elif snapshot_id is not None:
+            snapshot_ref = None
+            sourcevol_ref = None
+            image_service = None
+            image_location = None
+            image_meta = None
+
+            if snapshot_id is not None:
                 snapshot_ref = self.db.snapshot_get(context, snapshot_id)
-                model_update = self.driver.create_volume_from_snapshot(
-                    volume_ref,
-                    snapshot_ref)
             elif source_volid is not None:
-                src_vref = self.db.volume_get(context, source_volid)
-                model_update = self.driver.create_cloned_volume(volume_ref,
-                                                                src_vref)
-                self.db.volume_glance_metadata_copy_from_volume_to_volume(
-                    context,
-                    source_volid,
-                    volume_id)
-            else:
+                sourcevol_ref = self.db.volume_get(context, source_volid)
+            elif image_id is not None:
                 # create the volume from an image
                 image_service, image_id = \
                     glance.get_remote_image_service(context,
                                                     image_id)
                 image_location = image_service.get_location(context, image_id)
                 image_meta = image_service.show(context, image_id)
-                cloned = self.driver.clone_image(volume_ref, image_location)
-                if not cloned:
-                    model_update = self.driver.create_volume(volume_ref)
-                    status = 'downloading'
+
+            try:
+                model_update, cloned = self._create_volume(context,
+                                                           volume_ref,
+                                                           snapshot_ref,
+                                                           sourcevol_ref,
+                                                           image_service,
+                                                           image_id,
+                                                           image_location)
+            except Exception:
+                # restore source volume status before reschedule
+                if sourcevol_ref is not None:
+                    self.db.volume_update(context, sourcevol_ref['id'],
+                                          {'status': sourcevol_ref['status']})
+                exc_info = sys.exc_info()
+                # try to re-schedule volume:
+                self._reschedule_or_reraise(context, volume_id, exc_info,
+                                            snapshot_id, image_id,
+                                            request_spec, filter_properties,
+                                            allow_reschedule)
 
             if model_update:
                 volume_ref = self.db.volume_update(
                     context, volume_ref['id'], model_update)
+            if sourcevol_ref is not None:
+                self.db.volume_glance_metadata_copy_from_volume_to_volume(
+                    context,
+                    source_volid,
+                    volume_id)
 
             LOG.debug(_("volume %s: creating export"), volume_ref['name'])
             model_update = self.driver.create_export(context, volume_ref)
@@ -214,13 +268,6 @@ class VolumeManager(manager.SchedulerDependentManager):
                                                           volume_ref['id'],
                                                           snapshot_id)
 
-        now = timeutils.utcnow()
-        self.db.volume_update(context,
-                              volume_ref['id'], {'status': status,
-                                                 'launched_at': now})
-        LOG.debug(_("volume %s: created successfully"), volume_ref['name'])
-        self._reset_stats()
-
         if image_id and not cloned:
             if image_meta:
                 # Copy all of the Glance image properties to the
@@ -239,11 +286,91 @@ class VolumeManager(manager.SchedulerDependentManager):
                                                           volume_ref['id'],
                                                           key, value)
 
-            # Copy the image onto the volume.
-            self._copy_image_to_volume(context, volume_ref, image_id)
+        now = timeutils.utcnow()
+        self.db.volume_update(context,
+                              volume_ref['id'], {'status': status,
+                                                 'launched_at': now})
+        LOG.debug(_("volume %s: created successfully"), volume_ref['name'])
+        self._reset_stats()
+
         self._notify_about_volume_usage(context, volume_ref, "create.end")
         return volume_ref['id']
 
+    def _log_original_error(self, exc_info, volume_id):
+        type_, value, tb = exc_info
+        LOG.error(_('Error: %s') %
+                  traceback.format_exception(type_, value, tb),
+                  volume_id=volume_id)
+
+    def _reschedule_or_reraise(self, context, volume_id, exc_info,
+                               snapshot_id, image_id, request_spec,
+                               filter_properties, allow_reschedule):
+        """Try to re-schedule the create or re-raise the original error to
+        error out the volume.
+        """
+        if not allow_reschedule:
+            raise exc_info[0], exc_info[1], exc_info[2]
+
+        rescheduled = False
+
+        try:
+            method_args = (FLAGS.volume_topic, volume_id, snapshot_id,
+                           image_id, request_spec, filter_properties)
+
+            rescheduled = self._reschedule(context, request_spec,
+                                           filter_properties, volume_id,
+                                           self.scheduler_rpcapi.create_volume,
+                                           method_args,
+                                           exc_info)
+
+        except Exception:
+            rescheduled = False
+            LOG.exception(_("Error trying to reschedule"),
+                          volume_id=volume_id)
+
+        if rescheduled:
+            # log the original build error
+            self._log_original_error(exc_info, volume_id)
+        else:
+            # not re-scheduling
+            raise exc_info[0], exc_info[1], exc_info[2]
+
+    def _reschedule(self, context, request_spec, filter_properties,
+                    volume_id, scheduler_method, method_args,
+                    exc_info=None):
+        """Attempt to re-schedule a volume operation."""
+
+        retry = filter_properties.get('retry', None)
+        if not retry:
+            # no retry information, do not reschedule.
+            LOG.debug(_("Retry info not present, will not reschedule"),
+                      volume_id=volume_id)
+            return
+
+        if not request_spec:
+            LOG.debug(_("No request spec, will not reschedule"),
+                      volume_id=volume_id)
+            return
+
+        request_spec['volume_id'] = [volume_id]
+
+        LOG.debug(_("Re-scheduling %(method)s: attempt %(num)d") %
+                  {'method': scheduler_method.func_name,
+                   'num': retry['num_attempts']}, volume_id=volume_id)
+
+        # reset the volume state:
+        now = timeutils.utcnow()
+        self.db.volume_update(context, volume_id,
+                              {'status': 'creating',
+                               'scheduled_at': now})
+
+        if exc_info:
+            # stringify to avoid circular ref problem in json serialization:
+            retry['exc'] = traceback.format_exception(*exc_info)
+
+        scheduler_method(context, *method_args)
+        return True
+
     def delete_volume(self, context, volume_id):
         """Deletes and unexports volume."""
         context = context.elevated()
@@ -408,23 +535,14 @@ class VolumeManager(manager.SchedulerDependentManager):
                 volume_ref['name'] not in volume_ref['provider_location']):
             self.driver.ensure_export(context, volume_ref)
 
-    def _copy_image_to_volume(self, context, volume, image_id):
+    def _copy_image_to_volume(self, context, volume, image_service, image_id):
         """Downloads Glance image to the specified volume. """
         volume_id = volume['id']
-        payload = {'volume_id': volume_id, 'image_id': image_id}
-        try:
-            image_service, image_id = glance.get_remote_image_service(context,
-                                                                      image_id)
-            self.driver.copy_image_to_volume(context, volume, image_service,
-                                             image_id)
-            LOG.debug(_("Downloaded image %(image_id)s to %(volume_id)s "
-                        "successfully") % locals())
-            self.db.volume_update(context, volume_id,
-                                  {'status': 'available'})
-        except Exception, error:
-            with excutils.save_and_reraise_exception():
-                payload['message'] = unicode(error)
-                self.db.volume_update(context, volume_id, {'status': 'error'})
+        self.driver.copy_image_to_volume(context, volume,
+                                         image_service,
+                                         image_id)
+        LOG.debug(_("Downloaded image %(image_id)s to %(volume_id)s "
+                    "successfully") % locals())
 
     def copy_volume_to_image(self, context, volume_id, image_meta):
         """Uploads the specified volume to Glance.
index d9f72f0a06688aaf7da9f85546f0d1a04ba71a55..b4098c8b87de9a4b6cdf6848ca5382cefcb142f2 100644 (file)
@@ -36,6 +36,8 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
         1.1 - Adds clone volume option to create_volume.
         1.2 - Add publish_service_capabilities() method.
         1.3 - Pass all image metadata (not just ID) in copy_volume_to_image
+        1.4 - Add request_spec, filter_properties and
+              allow_reschedule arguments to create_volume().
     '''
 
     BASE_RPC_API_VERSION = '1.0'
@@ -46,18 +48,23 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
             default_version=self.BASE_RPC_API_VERSION)
 
     def create_volume(self, ctxt, volume, host,
+                      request_spec, filter_properties,
+                      allow_reschedule=True,
                       snapshot_id=None, image_id=None,
                       source_volid=None):
         self.cast(ctxt,
                   self.make_msg('create_volume',
                                 volume_id=volume['id'],
+                                request_spec=request_spec,
+                                filter_properties=filter_properties,
+                                allow_reschedule=allow_reschedule,
                                 snapshot_id=snapshot_id,
                                 image_id=image_id,
                                 source_volid=source_volid),
                   topic=rpc.queue_get_for(ctxt,
                                           self.topic,
                                           host),
-                  version='1.1')
+                  version='1.4')
 
     def delete_volume(self, ctxt, volume):
         self.cast(ctxt,
index 2770d554fb01513ca35fcbbd8305945059beee87..691bc388b174fc346e82de492b9cc5557df25756 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -34,6 +34,8 @@ filters = [
     "cinder.scheduler.filters.capacity_filter:CapacityFilter",
     "JsonFilter = "
     "cinder.openstack.common.scheduler.filters.json_filter:JsonFilter",
+    "RetryFilter = "
+    "cinder.scheduler.filters.retry_filter:RetryFilter",
 ]
 
 weights = [