]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Implement function to manage/unmanage snapshots
authorwanghao <wanghao749@huawei.com>
Wed, 31 Dec 2014 04:06:53 +0000 (12:06 +0800)
committerwanghao <wanghao749@huawei.com>
Wed, 26 Aug 2015 06:16:34 +0000 (14:16 +0800)
1. Add snapshots manage action in contrib api and unmanage
action in API extension.
2. Implement manage_existing_snapshot in manager and flow
to import existing snapshots.
3. Add manage/unmanage support in the LVM driver

Implements: blueprint support-import-export-snapshots
DocImpact
APIImpact
Similar to volume manage/unmanage.
Change-Id: Ib6cf8392b0bc99f803316991f31a75788677e9cf

17 files changed:
cinder/api/contrib/snapshot_manage.py [new file with mode: 0644]
cinder/api/contrib/snapshot_unmanage.py [new file with mode: 0644]
cinder/tests/unit/api/contrib/test_snapshot_manage.py [new file with mode: 0644]
cinder/tests/unit/api/contrib/test_snapshot_unmanage.py [new file with mode: 0644]
cinder/tests/unit/policy.json
cinder/tests/unit/test_volume.py
cinder/tests/unit/test_volume_rpcapi.py
cinder/tests/unit/test_volume_utils.py
cinder/volume/api.py
cinder/volume/driver.py
cinder/volume/drivers/lvm.py
cinder/volume/flows/common.py
cinder/volume/flows/manager/manage_existing_snapshot.py [new file with mode: 0644]
cinder/volume/manager.py
cinder/volume/rpcapi.py
cinder/volume/utils.py
etc/cinder/policy.json

diff --git a/cinder/api/contrib/snapshot_manage.py b/cinder/api/contrib/snapshot_manage.py
new file mode 100644 (file)
index 0000000..8c7fefb
--- /dev/null
@@ -0,0 +1,145 @@
+#   Copyright 2015 Huawei Technologies Co., Ltd.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License"); you may
+#   not use this file except in compliance with the License. You may obtain
+#   a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#   License for the specific language governing permissions and limitations
+#   under the License.
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from webob import exc
+
+from cinder.api import extensions
+from cinder.api.openstack import wsgi
+from cinder.api.v2 import snapshots
+from cinder.api.views import snapshots as snapshot_views
+from cinder import exception
+from cinder.i18n import _
+from cinder import volume as cinder_volume
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+authorize = extensions.extension_authorizer('snapshot', 'snapshot_manage')
+
+
+class SnapshotManageController(wsgi.Controller):
+    """The /os-snapshot-manage controller for the OpenStack API."""
+
+    _view_builder_class = snapshot_views.ViewBuilder
+
+    def __init__(self, *args, **kwargs):
+        super(SnapshotManageController, self).__init__(*args, **kwargs)
+        self.volume_api = cinder_volume.API()
+
+    @wsgi.response(202)
+    @wsgi.serializers(xml=snapshots.SnapshotTemplate)
+    def create(self, req, body):
+        """Instruct Cinder to manage a storage snapshot object.
+
+        Manages an existing backend storage snapshot object (e.g. a Linux
+        logical volume or a SAN disk) by creating the Cinder objects required
+        to manage it, and possibly renaming the backend storage snapshot object
+        (driver dependent).
+
+        From an API perspective, this operation behaves very much like a
+        snapshot creation operation.
+
+        Required HTTP Body:
+
+        {
+         "snapshot":
+          {
+           "volume_id": <Cinder volume already exists in volume backend>,
+           "ref":  <Driver-specific reference to the existing storage object>,
+          }
+        }
+
+        See the appropriate Cinder drivers' implementations of the
+        manage_snapshot method to find out the accepted format of 'ref'.
+        For example,in LVM driver, it will be the logic volume name of snapshot
+        which you want to manage.
+
+        This API call will return with an error if any of the above elements
+        are missing from the request, or if the 'volume_id' element refers to
+        a cinder volume that could not be found.
+
+        The snapshot will later enter the error state if it is discovered that
+        'ref' is bad.
+
+        Optional elements to 'snapshot' are:
+            name               A name for the new snapshot.
+            description        A description for the new snapshot.
+            metadata           Key/value pairs to be associated with the new
+                               snapshot.
+        """
+        context = req.environ['cinder.context']
+        authorize(context)
+
+        if not self.is_valid_body(body, 'snapshot'):
+            msg = _("Missing required element snapshot in request body.")
+            raise exc.HTTPBadRequest(explanation=msg)
+
+        snapshot = body['snapshot']
+
+        # Check that the required keys are present, return an error if they
+        # are not.
+        required_keys = ('ref', 'volume_id')
+        missing_keys = set(required_keys) - set(snapshot.keys())
+
+        if missing_keys:
+            msg = _("The following elements are required: "
+                    "%s") % ', '.join(missing_keys)
+            raise exc.HTTPBadRequest(explanation=msg)
+
+        # Check whether volume exists
+        volume_id = snapshot['volume_id']
+        try:
+            volume = self.volume_api.get(context, volume_id)
+        except exception.VolumeNotFound:
+            msg = _("Volume: %s could not be found.") % volume_id
+            raise exc.HTTPNotFound(explanation=msg)
+
+        LOG.debug('Manage snapshot request body: %s', body)
+
+        snapshot_parameters = {}
+
+        snapshot_parameters['metadata'] = snapshot.get('metadata', None)
+        snapshot_parameters['description'] = snapshot.get('description', None)
+        # NOTE(wanghao) if name in request body, we are overriding the 'name'
+        snapshot_parameters['name'] = snapshot.get('name',
+                                                   snapshot.get('display_name')
+                                                   )
+
+        try:
+            new_snapshot = self.volume_api.manage_existing_snapshot(
+                context,
+                snapshot['ref'],
+                volume,
+                **snapshot_parameters)
+        except exception.ServiceNotFound:
+            msg = _("Service %s not found.") % CONF.volume_topic
+            raise exc.HTTPNotFound(explanation=msg)
+
+        return self._view_builder.detail(req, new_snapshot)
+
+
+class Snapshot_manage(extensions.ExtensionDescriptor):
+    """Allows existing backend storage to be 'managed' by Cinder."""
+
+    name = 'SnapshotManage'
+    alias = 'os-snapshot-manage'
+    namespace = ('http://docs.openstack.org/volume/ext/'
+                 'os-snapshot-manage/api/v1')
+    updated = '2014-12-31T00:00:00+00:00'
+
+    def get_resources(self):
+        controller = SnapshotManageController()
+        return [extensions.ResourceExtension(Snapshot_manage.alias,
+                                             controller)]
diff --git a/cinder/api/contrib/snapshot_unmanage.py b/cinder/api/contrib/snapshot_unmanage.py
new file mode 100644 (file)
index 0000000..5ddf96b
--- /dev/null
@@ -0,0 +1,77 @@
+#   Copyright 2015 Huawei Technologies Co., Ltd.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License"); you may
+#   not use this file except in compliance with the License. You may obtain
+#   a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#   License for the specific language governing permissions and limitations
+#   under the License.
+
+from oslo_log import log as logging
+import webob
+from webob import exc
+
+from cinder.api import extensions
+from cinder.api.openstack import wsgi
+from cinder import exception
+from cinder.i18n import _LI
+from cinder import volume
+
+LOG = logging.getLogger(__name__)
+authorize = extensions.extension_authorizer('snapshot', 'snapshot_unmanage')
+
+
+class SnapshotUnmanageController(wsgi.Controller):
+    def __init__(self, *args, **kwargs):
+        super(SnapshotUnmanageController, self).__init__(*args, **kwargs)
+        self.volume_api = volume.API()
+
+    @wsgi.response(202)
+    @wsgi.action('os-unmanage')
+    def unmanage(self, req, id, body):
+        """Stop managing a snapshot.
+
+        This action is very much like a delete, except that a different
+        method (unmanage) is called on the Cinder driver.  This has the effect
+        of removing the snapshot from Cinder management without actually
+        removing the backend storage object associated with it.
+
+        There are no required parameters.
+
+        A Not Found error is returned if the specified snapshot does not exist.
+        """
+        context = req.environ['cinder.context']
+        authorize(context)
+
+        LOG.info(_LI("Unmanage snapshot with id: %s"), id, context=context)
+
+        try:
+            snapshot = self.volume_api.get_snapshot(context, id)
+            self.volume_api.delete_snapshot(context, snapshot,
+                                            unmanage_only=True)
+        except exception.SnapshotNotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidSnapshot as ex:
+            raise exc.HTTPBadRequest(explanation=ex.msg)
+        return webob.Response(status_int=202)
+
+
+class Snapshot_unmanage(extensions.ExtensionDescriptor):
+    """Enable volume unmanage operation."""
+
+    name = "SnapshotUnmanage"
+    alias = "os-snapshot-unmanage"
+    namespace = ('http://docs.openstack.org/snapshot/ext/snapshot-unmanage'
+                 '/api/v1')
+    updated = "2014-12-31T00:00:00+00:00"
+
+    def get_controller_extensions(self):
+        controller = SnapshotUnmanageController()
+        extension = extensions.ControllerExtension(self, 'snapshots',
+                                                   controller)
+        return [extension]
diff --git a/cinder/tests/unit/api/contrib/test_snapshot_manage.py b/cinder/tests/unit/api/contrib/test_snapshot_manage.py
new file mode 100644 (file)
index 0000000..500c01a
--- /dev/null
@@ -0,0 +1,124 @@
+#   Copyright (c) 2015 Huawei Technologies Co., Ltd.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License"); you may
+#   not use this file except in compliance with the License. You may obtain
+#   a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#   License for the specific language governing permissions and limitations
+#   under the License.
+
+import mock
+from oslo_serialization import jsonutils
+import webob
+
+from cinder import context
+from cinder import exception
+from cinder import test
+from cinder.tests.unit.api import fakes
+
+
+def app():
+    # no auth, just let environ['cinder.context'] pass through
+    api = fakes.router.APIRouter()
+    mapper = fakes.urlmap.URLMap()
+    mapper['/v2'] = api
+    return mapper
+
+
+def volume_get(self, context, volume_id, viewable_admin_meta=False):
+    if volume_id == 'fake_volume_id':
+        return {'id': 'fake_volume_id', 'name': 'fake_volume_name',
+                'host': 'fake_host'}
+    raise exception.VolumeNotFound(volume_id=volume_id)
+
+
+@mock.patch('cinder.volume.api.API.get', volume_get)
+class SnapshotManageTest(test.TestCase):
+    """Test cases for cinder/api/contrib/snapshot_manage.py
+
+    The API extension adds a POST /os-snapshot-manage API that is passed a
+    cinder volume id, and a driver-specific reference parameter.
+    If everything is passed correctly,
+    then the cinder.volume.api.API.manage_existing_snapshot method
+    is invoked to manage an existing storage object on the host.
+
+    In this set of test cases, we are ensuring that the code correctly parses
+    the request structure and raises the correct exceptions when things are not
+    right, and calls down into cinder.volume.api.API.manage_existing_snapshot
+    with the correct arguments.
+    """
+
+    def _get_resp(self, body):
+        """Helper to execute an os-snapshot-manage API call."""
+        req = webob.Request.blank('/v2/fake/os-snapshot-manage')
+        req.method = 'POST'
+        req.headers['Content-Type'] = 'application/json'
+        req.environ['cinder.context'] = context.RequestContext('admin',
+                                                               'fake',
+                                                               True)
+        req.body = jsonutils.dumps(body)
+        res = req.get_response(app())
+        return res
+
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
+    @mock.patch('cinder.volume.api.API.create_snapshot_in_db')
+    @mock.patch('cinder.db.service_get_by_host_and_topic')
+    def test_manage_snapshot_ok(self, mock_db,
+                                mock_create_snapshot, mock_rpcapi):
+        """Test successful manage volume execution.
+
+        Tests for correct operation when valid arguments are passed in the
+        request body. We ensure that cinder.volume.api.API.manage_existing got
+        called with the correct arguments, and that we return the correct HTTP
+        code to the caller.
+        """
+        body = {'snapshot': {'volume_id': 'fake_volume_id', 'ref': 'fake_ref'}}
+        res = self._get_resp(body)
+        self.assertEqual(202, res.status_int, res)
+
+        # Check the db.service_get_by_host_and_topic was called with correct
+        # arguments.
+        self.assertEqual(1, mock_db.call_count)
+        args = mock_db.call_args[0]
+        self.assertEqual('fake_host', args[1])
+
+        # Check the create_snapshot_in_db was called with correct arguments.
+        self.assertEqual(1, mock_create_snapshot.call_count)
+        args = mock_create_snapshot.call_args[0]
+        self.assertEqual('fake_volume_id', args[1].get('id'))
+
+        # Check the volume_rpcapi.manage_existing_snapshot was called with
+        # correct arguments.
+        self.assertEqual(1, mock_rpcapi.call_count)
+        args = mock_rpcapi.call_args[0]
+        self.assertEqual('fake_ref', args[2])
+
+    def test_manage_snapshot_missing_volume_id(self):
+        """Test correct failure when volume_id is not specified."""
+        body = {'snapshot': {'ref': 'fake_ref'}}
+        res = self._get_resp(body)
+        self.assertEqual(400, res.status_int)
+
+    def test_manage_snapshot_missing_ref(self):
+        """Test correct failure when the ref is not specified."""
+        body = {'snapshot': {'volume_id': 'fake_volume_id'}}
+        res = self._get_resp(body)
+        self.assertEqual(400, res.status_int)
+
+    def test_manage_snapshot_error_body(self):
+        """Test correct failure when body is invaild."""
+        body = {'error_snapshot': {'volume_id': 'fake_volume_id'}}
+        res = self._get_resp(body)
+        self.assertEqual(400, res.status_int)
+
+    def test_manage_snapshot_error_volume_id(self):
+        """Test correct failure when volume can't be found."""
+        body = {'snapshot': {'volume_id': 'error_volume_id',
+                             'ref': 'fake_ref'}}
+        res = self._get_resp(body)
+        self.assertEqual(404, res.status_int)
diff --git a/cinder/tests/unit/api/contrib/test_snapshot_unmanage.py b/cinder/tests/unit/api/contrib/test_snapshot_unmanage.py
new file mode 100644 (file)
index 0000000..4ce70f1
--- /dev/null
@@ -0,0 +1,113 @@
+#   Copyright (c) 2015 Huawei Technologies Co., Ltd.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License"); you may
+#   not use this file except in compliance with the License. You may obtain
+#   a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#   License for the specific language governing permissions and limitations
+#   under the License.
+
+import mock
+from oslo_serialization import jsonutils
+import webob
+
+from cinder import context
+from cinder import exception
+from cinder import test
+from cinder.tests.unit.api import fakes
+from cinder.tests.unit import fake_snapshot
+from cinder.tests.unit import fake_volume
+
+
+# This list of fake snapshot is used by our tests.
+snapshot_id = 'ffffffff-0000-ffff-0000-ffffffffffff'
+bad_snp_id = 'ffffffff-0000-ffff-0000-fffffffffffe'
+
+
+def app():
+    # no auth, just let environ['cinder.context'] pass through
+    api = fakes.router.APIRouter()
+    mapper = fakes.urlmap.URLMap()
+    mapper['/v2'] = api
+    return mapper
+
+
+def api_snapshot_get(self, context, snp_id):
+    """Replacement for cinder.volume.api.API.get_snapshot.
+
+    We stub the cinder.volume.api.API.get_snapshot method to check for the
+    existence of snapshot_id in our list of fake snapshots and raise an
+    exception if the specified snapshot ID is not in our list.
+    """
+    snapshot = {'id': 'ffffffff-0000-ffff-0000-ffffffffffff',
+                'progress': '100%',
+                'volume_id': 'fake_volume_id',
+                'project_id': 'fake_project',
+                'status': 'available'}
+    if snp_id == snapshot_id:
+        snapshot_objct = fake_snapshot.fake_snapshot_obj(context, **snapshot)
+        return snapshot_objct
+    else:
+        raise exception.SnapshotNotFound(snapshot_id=snp_id)
+
+
+@mock.patch('cinder.volume.api.API.get_snapshot', api_snapshot_get)
+class SnapshotUnmanageTest(test.TestCase):
+    """Test cases for cinder/api/contrib/snapshot_unmanage.py
+
+    The API extension adds an action to snapshots, "os-unmanage", which will
+    effectively issue a delete operation on the snapshot, but with a flag set
+    that means that a different method will be invoked on the driver, so that
+    the snapshot is not actually deleted in the storage backend.
+
+    In this set of test cases, we are ensuring that the code correctly parses
+    the request structure and raises the correct exceptions when things are not
+    right, and calls down into cinder.volume.api.API.delete_snapshot with the
+    correct arguments.
+    """
+
+    def _get_resp(self, snapshot_id):
+        """Helper to build an os-unmanage req for the specified snapshot_id."""
+        req = webob.Request.blank('/v2/fake/snapshots/%s/action' % snapshot_id)
+        req.method = 'POST'
+        req.headers['Content-Type'] = 'application/json'
+        req.environ['cinder.context'] = context.RequestContext('admin',
+                                                               'fake',
+                                                               True)
+        body = {'os-unmanage': ''}
+        req.body = jsonutils.dumps(body)
+        res = req.get_response(app())
+        return res
+
+    @mock.patch('cinder.db.snapshot_update')
+    @mock.patch('cinder.objects.volume.Volume.get_by_id')
+    @mock.patch('cinder.db.volume_get')
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.delete_snapshot')
+    def test_unmanage_snapshot_ok(self, mock_rpcapi, mock_db,
+                                  mock_volume_get_by_id, mock_db_update):
+        """Return success for valid and unattached volume."""
+        ctxt = context.RequestContext('admin', 'fake', True)
+        volume = fake_volume.fake_volume_obj(ctxt, id='fake_volume_id')
+        mock_volume_get_by_id.return_value = volume
+        res = self._get_resp(snapshot_id)
+
+        self.assertEqual(1, mock_db.call_count)
+        self.assertEqual(2, len(mock_db.call_args[0]), mock_db.call_args)
+        self.assertEqual('fake_volume_id', mock_db.call_args[0][1])
+
+        self.assertEqual(1, mock_rpcapi.call_count)
+        self.assertEqual(3, len(mock_rpcapi.call_args[0]))
+        self.assertEqual(1, len(mock_rpcapi.call_args[1]))
+        self.assertTrue(mock_rpcapi.call_args[1]['unmanage_only'])
+
+        self.assertEqual(202, res.status_int, res)
+
+    def test_unmanage_snapshot_bad_snapshot_id(self):
+        """Return 404 if the volume does not exist."""
+        res = self._get_resp(bad_snp_id)
+        self.assertEqual(404, res.status_int, res)
index 0948d3dd0afe9954423fb1109b4bfc567de99799..05b27d16deba88500cdbf36bb47d102c93591b2c 100644 (file)
@@ -74,6 +74,8 @@
     "limits_extension:used_limits": "",
 
     "snapshot_extension:snapshot_actions:update_snapshot_status": "",
+    "snapshot_extension:snapshot_manage": "rule:admin_api",
+    "snapshot_extension:snapshot_unmanage": "rule:admin_api",
 
     "volume:create_transfer": "",
     "volume:accept_transfer": "",
index 45f2bb8205e7a89e5ed9c24c47b2b94eb9f58d53..de71c9a964c5de5310810e81275ac9d4f50833a1 100644 (file)
@@ -6283,6 +6283,66 @@ class LVMISCSIVolumeDriverTestCase(DriverTestCase):
                           self.volume.driver.manage_existing_get_size,
                           vol, ref)
 
+    def test_lvm_manage_existing_snapshot(self):
+        """Good pass on managing an LVM snapshot.
+
+        This test case ensures that, when a logical volume's snapshot with the
+        specified name exists, and the size is as expected, no error is
+        returned from driver.manage_existing_snapshot, and that the
+        rename_volume function is called in the Brick LVM code with the correct
+        arguments.
+        """
+        self._setup_stubs_for_manage_existing()
+
+        ref = {'source-name': 'fake_lv'}
+        snp = {'name': 'test', 'id': 1, 'size': 0}
+
+        def _rename_volume(old_name, new_name):
+            self.assertEqual(ref['source-name'], old_name)
+            self.assertEqual(snp['name'], new_name)
+
+        with mock.patch.object(self.volume.driver.vg, 'rename_volume') as \
+                mock_rename_volume:
+            mock_rename_volume.return_value = _rename_volume
+
+            size = self.volume.driver.manage_existing_snapshot_get_size(snp,
+                                                                        ref)
+            self.assertEqual(2, size)
+            model_update = self.volume.driver.manage_existing_snapshot(snp,
+                                                                       ref)
+            self.assertIsNone(model_update)
+
+    def test_lvm_manage_existing_snapshot_bad_ref(self):
+        """Error case where specified LV snapshot doesn't exist.
+
+        This test case ensures that the correct exception is raised when
+        the caller attempts to manage a snapshot that does not exist.
+        """
+        self._setup_stubs_for_manage_existing()
+
+        ref = {'source-name': 'fake_nonexistent_lv'}
+        snp = {'name': 'test', 'id': 1, 'size': 0, 'status': 'available'}
+
+        self.assertRaises(exception.ManageExistingInvalidReference,
+                          self.volume.driver.manage_existing_snapshot_get_size,
+                          snp, ref)
+
+    def test_lvm_manage_existing_snapshot_bad_size(self):
+        """Make sure correct exception on bad size returned from LVM.
+
+        This test case ensures that the correct exception is raised when
+        the information returned for the existing LVs is not in the format
+        that the manage_existing_snapshot code expects.
+        """
+        self._setup_stubs_for_manage_existing()
+
+        ref = {'source-name': 'fake_lv_bad_size'}
+        snp = {'name': 'test', 'id': 1, 'size': 2}
+
+        self.assertRaises(exception.VolumeBackendAPIException,
+                          self.volume.driver.manage_existing_snapshot_get_size,
+                          snp, ref)
+
 
 class LVMVolumeDriverTestCase(DriverTestCase):
     """Test case for VolumeDriver"""
index 546ca5dc323cb261f1fbfafdf4ad5a68dc1fee27..cca2658db1cc13ffcda24e8ebfd1a530d25dcf9a 100644 (file)
@@ -25,6 +25,7 @@ from cinder import db
 from cinder import objects
 from cinder import test
 from cinder.tests.unit import fake_snapshot
+from cinder.tests.unit import fake_volume
 from cinder.tests.unit import utils as tests_utils
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils
@@ -150,6 +151,8 @@ class VolumeRpcAPITestCase(test.TestCase):
             host = kwargs['host']
         elif 'group' in kwargs:
             host = kwargs['group']['host']
+        elif 'volume' not in kwargs and 'snapshot' in kwargs:
+            host = 'fake_host'
         else:
             host = kwargs['volume']['host']
 
@@ -231,7 +234,15 @@ class VolumeRpcAPITestCase(test.TestCase):
         self._test_volume_api('delete_snapshot',
                               rpc_method='cast',
                               snapshot=self.fake_snapshot_obj,
-                              host='fake_host')
+                              host='fake_host',
+                              unmanage_only=False)
+
+    def test_delete_snapshot_with_unmanage_only(self):
+        self._test_volume_api('delete_snapshot',
+                              rpc_method='cast',
+                              snapshot=self.fake_snapshot_obj,
+                              host='fake_host',
+                              unmanage_only=True)
 
     def test_attach_volume_to_instance(self):
         self._test_volume_api('attach_volume',
@@ -343,6 +354,27 @@ class VolumeRpcAPITestCase(test.TestCase):
                               ref={'lv_name': 'foo'},
                               version='1.15')
 
+    def test_manage_existing_snapshot(self):
+        volume_update = {'host': 'fake_host'}
+        snpshot = {
+            'id': 1,
+            'volume_id': 'fake_id',
+            'status': "creating",
+            'progress': '0%',
+            'volume_size': 0,
+            'display_name': 'fake_name',
+            'display_description': 'fake_description',
+            'volume': fake_volume.fake_db_volume(**volume_update),
+            'expected_attrs': ['volume'], }
+        my_fake_snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
+                                                               **snpshot)
+        self._test_volume_api('manage_existing_snapshot',
+                              rpc_method='cast',
+                              snapshot=my_fake_snapshot_obj,
+                              ref='foo',
+                              host='fake_host',
+                              version='1.28')
+
     def test_promote_replica(self):
         self._test_volume_api('promote_replica',
                               rpc_method='cast',
index 41b1a0add3f8509b3d26a5b676a838f289b88dbd..fbf98eada339d2c4060a0ed2e12001a3a9bf85f1 100644 (file)
@@ -22,6 +22,7 @@ import mock
 from oslo_concurrency import processutils
 from oslo_config import cfg
 
+from cinder import context
 from cinder import exception
 from cinder import test
 from cinder import utils
@@ -802,3 +803,20 @@ class VolumeUtilsTestCase(test.TestCase):
         self.assertEqual(
             expected_dict,
             volume_utils.convert_config_string_to_dict(test_string))
+
+    def test_process_reserve_over_quota(self):
+        ctxt = context.get_admin_context()
+        ctxt.project_id = 'fake'
+        overs_one = ['gigabytes']
+        over_two = ['snapshots']
+        usages = {'gigabytes': {'reserved': 1, 'in_use': 9},
+                  'snapshots': {'reserved': 1, 'in_use': 9}}
+        quotas = {'gigabytes': 10, 'snapshots': 10}
+        size = 1
+
+        self.assertRaises(exception.VolumeSizeExceedsAvailableQuota,
+                          volume_utils.process_reserve_over_quota,
+                          ctxt, overs_one, usages, quotas, size)
+        self.assertRaises(exception.SnapshotLimitExceeded,
+                          volume_utils.process_reserve_over_quota,
+                          ctxt, over_two, usages, quotas, size)
index af64a635329d84e9f138e7cb97984e7945ccf1d9..3c742630b3d8e46f26d04171e67498552967a0d0 100644 (file)
@@ -837,32 +837,8 @@ class API(base.Base):
             overs = e.kwargs['overs']
             usages = e.kwargs['usages']
             quotas = e.kwargs['quotas']
-
-            def _consumed(name):
-                return (usages[name]['reserved'] + usages[name]['in_use'])
-
-            for over in overs:
-                if 'gigabytes' in over:
-                    msg = _LW("Quota exceeded for %(s_pid)s, tried to create "
-                              "%(s_size)sG snapshot (%(d_consumed)dG of "
-                              "%(d_quota)dG already consumed).")
-                    LOG.warning(msg, {'s_pid': context.project_id,
-                                      's_size': volume['size'],
-                                      'd_consumed': _consumed(over),
-                                      'd_quota': quotas[over]})
-                    raise exception.VolumeSizeExceedsAvailableQuota(
-                        requested=volume['size'],
-                        consumed=_consumed('gigabytes'),
-                        quota=quotas['gigabytes'])
-                elif 'snapshots' in over:
-                    msg = _LW("Quota exceeded for %(s_pid)s, tried to create "
-                              "snapshot (%(d_consumed)d snapshots "
-                              "already consumed).")
-
-                    LOG.warning(msg, {'s_pid': context.project_id,
-                                      'd_consumed': _consumed(over)})
-                    raise exception.SnapshotLimitExceeded(
-                        allowed=quotas[over])
+            volume_utils.process_reserve_over_quota(context, overs, usages,
+                                                    quotas, volume['size'])
 
         return reservations
 
@@ -901,7 +877,8 @@ class API(base.Base):
         return result
 
     @wrap_check_policy
-    def delete_snapshot(self, context, snapshot, force=False):
+    def delete_snapshot(self, context, snapshot, force=False,
+                        unmanage_only=False):
         if not force and snapshot['status'] not in ["available", "error"]:
             LOG.error(_LE('Unable to delete snapshot: %(snap_id)s, '
                           'due to invalid status. '
@@ -924,7 +901,8 @@ class API(base.Base):
 
         volume = self.db.volume_get(context, snapshot_obj.volume_id)
         self.volume_rpcapi.delete_snapshot(context, snapshot_obj,
-                                           volume['host'])
+                                           volume['host'],
+                                           unmanage_only=unmanage_only)
         LOG.info(_LI("Snapshot delete request issued successfully."),
                  resource=snapshot)
 
@@ -1471,7 +1449,9 @@ class API(base.Base):
                     elevated, svc_host, CONF.volume_topic)
             except exception.ServiceNotFound:
                 with excutils.save_and_reraise_exception():
-                    LOG.error(_LE('Unable to find service for given host.'))
+                    LOG.error(_LE('Unable to find service: %(service)s for '
+                                  'given host: %(host)s.'),
+                              {'service': CONF.volume_topic, 'host': host})
             availability_zone = service.get('availability_zone')
 
         manage_what = {
@@ -1505,6 +1485,26 @@ class API(base.Base):
                      resource=vol_ref)
             return vol_ref
 
+    def manage_existing_snapshot(self, context, ref, volume,
+                                 name=None, description=None,
+                                 metadata=None):
+        host = volume_utils.extract_host(volume['host'])
+        try:
+            self.db.service_get_by_host_and_topic(
+                context.elevated(), host, CONF.volume_topic)
+        except exception.ServiceNotFound:
+            with excutils.save_and_reraise_exception():
+                LOG.error(_LE('Unable to find service: %(service)s for '
+                              'given host: %(host)s.'),
+                          {'service': CONF.volume_topic, 'host': host})
+
+        snapshot_object = self.create_snapshot_in_db(context, volume, name,
+                                                     description, False,
+                                                     metadata, None)
+        self.volume_rpcapi.manage_existing_snapshot(context, snapshot_object,
+                                                    ref, host)
+        return snapshot_object
+
     #  Replication V2 methods ##
 
     # NOTE(jdg): It might be kinda silly to propogate the named
index 9dd83fb4b639695fd28122a606bf67a3fa3eabbe..7b66efcb837ed5999ba77b3b09f47ffefd4c380f 100644 (file)
@@ -1594,6 +1594,56 @@ class ReplicaV2VD(object):
         raise NotImplementedError()
 
 
+@six.add_metaclass(abc.ABCMeta)
+class ManageableSnapshotsVD(object):
+    # NOTE: Can't use abstractmethod before all drivers implement it
+    def manage_existing_snapshot(self, snapshot, existing_ref):
+        """Brings an existing backend storage object under Cinder management.
+
+        existing_ref is passed straight through from the API request's
+        manage_existing_ref value, and it is up to the driver how this should
+        be interpreted.  It should be sufficient to identify a storage object
+        that the driver should somehow associate with the newly-created cinder
+        snapshot structure.
+
+        There are two ways to do this:
+
+        1. Rename the backend storage object so that it matches the
+           snapshot['name'] which is how drivers traditionally map between a
+           cinder snapshot and the associated backend storage object.
+
+        2. Place some metadata on the snapshot, or somewhere in the backend,
+           that allows other driver requests (e.g. delete) to locate the
+           backend storage object when required.
+
+        If the existing_ref doesn't make sense, or doesn't refer to an existing
+        backend storage object, raise a ManageExistingInvalidReference
+        exception.
+        """
+        return
+
+    # NOTE: Can't use abstractmethod before all drivers implement it
+    def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
+        """Return size of snapshot to be managed by manage_existing.
+
+        When calculating the size, round up to the next GB.
+        """
+        return
+
+    # NOTE: Can't use abstractmethod before all drivers implement it
+    def unmanage_snapshot(self, snapshot):
+        """Removes the specified snapshot from Cinder management.
+
+        Does not delete the underlying backend storage object.
+
+        For most drivers, this will not need to do anything. However, some
+        drivers might use this call as an opportunity to clean up any
+        Cinder-specific configuration that they have associated with the
+        backend storage object.
+        """
+        pass
+
+
 @six.add_metaclass(abc.ABCMeta)
 class ReplicaVD(object):
     @abc.abstractmethod
@@ -1681,8 +1731,8 @@ class ReplicaVD(object):
 
 
 class VolumeDriver(ConsistencyGroupVD, TransferVD, ManageableVD, ExtendVD,
-                   CloneableVD, CloneableImageVD, SnapshotVD, ReplicaVD,
-                   LocalVD, MigrateVD, BaseVD):
+                   CloneableVD, CloneableImageVD, ManageableSnapshotsVD,
+                   SnapshotVD, ReplicaVD, LocalVD, MigrateVD, BaseVD):
     """This class will be deprecated soon.
 
     Please use the abstract classes above for new drivers.
@@ -1733,6 +1783,17 @@ class VolumeDriver(ConsistencyGroupVD, TransferVD, ManageableVD, ExtendVD,
         msg = _("Unmanage volume not implemented.")
         raise NotImplementedError(msg)
 
+    def manage_existing_snapshot(self, snapshot, existing_ref):
+        msg = _("Manage existing snapshot not implemented.")
+        raise NotImplementedError(msg)
+
+    def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
+        msg = _("Manage existing snapshot not implemented.")
+        raise NotImplementedError(msg)
+
+    def unmanage_snapshot(self, snapshot):
+        """Unmanage the specified snapshot from Cinder management."""
+
     def retype(self, context, volume, new_type, diff, host):
         return False, None
 
index d1e2f50589f5c8698999ad5708691257706e6d4f..6786439c0656da402171deec8f0337d2335002e5 100644 (file)
@@ -567,8 +567,9 @@ class LVMVolumeDriver(driver.VolumeDriver):
             raise exception.VolumeBackendAPIException(
                 data=exception_message)
 
-    def manage_existing_get_size(self, volume, existing_ref):
-        """Return size of an existing LV for manage_existing.
+    def manage_existing_object_get_size(self, existing_object, existing_ref,
+                                        object_type):
+        """Return size of an existing LV for manage existing volume/snapshot.
 
         existing_ref is a dictionary of the form:
         {'source-name': <name of LV>}
@@ -593,15 +594,33 @@ class LVMVolumeDriver(driver.VolumeDriver):
         try:
             lv_size = int(math.ceil(float(lv['size'])))
         except ValueError:
-            exception_message = (_("Failed to manage existing volume "
+            exception_message = (_("Failed to manage existing %(type)s "
                                    "%(name)s, because reported size %(size)s "
                                    "was not a floating-point number.")
-                                 % {'name': lv_name,
+                                 % {'type': object_type,
+                                    'name': lv_name,
                                     'size': lv['size']})
             raise exception.VolumeBackendAPIException(
                 data=exception_message)
         return lv_size
 
+    def manage_existing_get_size(self, volume, existing_ref):
+        return self.manage_existing_object_get_size(volume, existing_ref,
+                                                    "volume")
+
+    def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
+        if not isinstance(existing_ref, dict):
+            existing_ref = {"source-name": existing_ref}
+        return self.manage_existing_object_get_size(snapshot, existing_ref,
+                                                    "snapshot")
+
+    def manage_existing_snapshot(self, snapshot, existing_ref):
+        dest_name = self._escape_snapshot(snapshot['name'])
+        snapshot_temp = {"name": dest_name}
+        if not isinstance(existing_ref, dict):
+            existing_ref = {"source-name": existing_ref}
+        return self.manage_existing(snapshot_temp, existing_ref)
+
     def migrate_volume(self, ctxt, volume, host, thin=False, mirror_count=0):
         """Optimize the migration if the destination is on the same server.
 
index a361de4e1dde17b41f3ba3d397addda72714ea26..3886935a0aa28f3efae4eb087a660a7f8e16125a 100644 (file)
@@ -21,6 +21,7 @@ import six
 
 from cinder import exception
 from cinder.i18n import _LE
+from cinder import objects
 
 LOG = logging.getLogger(__name__)
 
@@ -64,32 +65,45 @@ def restore_source_status(context, db, volume_spec):
                        'source_volid': source_volid})
 
 
-def error_out_volume(context, db, volume_id, reason=None):
+def _clean_reason(reason):
+    if reason is None:
+        return '???'
+    reason = six.text_type(reason)
+    if len(reason) <= REASON_LENGTH:
+        return reason
+    else:
+        return reason[0:REASON_LENGTH] + '...'
 
-    def _clean_reason(reason):
-        if reason is None:
-            return '???'
-        reason = six.text_type(reason)
-        if len(reason) <= REASON_LENGTH:
-            return reason
-        else:
-            return reason[0:REASON_LENGTH] + '...'
 
+def _update_object(context, db, status, reason, object_type, object_id):
     update = {
-        'status': 'error',
+        'status': status,
     }
-    reason = _clean_reason(reason)
-    # TODO(harlowja): re-enable when we can support this in the database.
-    # if reason:
-    #     status['details'] = reason
     try:
-        LOG.debug('Updating volume: %(volume_id)s with %(update)s'
-                  ' due to: %(reason)s' % {'volume_id': volume_id,
-                                           'reason': reason,
-                                           'update': update})
-        db.volume_update(context, volume_id, update)
+        LOG.debug('Updating %(object_type)s: %(object_id)s with %(update)s'
+                  ' due to: %(reason)s', {'object_type': object_type,
+                                          'object_id': object_id,
+                                          'reason': reason,
+                                          'update': update})
+        if object_type == 'volume':
+            db.volume_update(context, object_id, update)
+        elif object_type == 'snapshot':
+            snapshot = objects.Snapshot.get_by_id(context, object_id)
+            snapshot.update(update)
+            snapshot.save()
     except exception.CinderException:
         # Don't let this cause further exceptions.
-        LOG.exception(_LE("Failed updating volume %(volume_id)s with"
-                          " %(update)s") % {'volume_id': volume_id,
-                                            'update': update})
+        LOG.exception(_LE("Failed updating %(object_type)s %(object_id)s with"
+                          " %(update)s"), {'object_type': object_type,
+                                           'object_id': object_id,
+                                           'update': update})
+
+
+def error_out_volume(context, db, volume_id, reason=None):
+    reason = _clean_reason(reason)
+    _update_object(context, db, 'error', reason, 'volume', volume_id)
+
+
+def error_out_snapshot(context, db, snapshot_id, reason=None):
+    reason = _clean_reason(reason)
+    _update_object(context, db, 'error', reason, 'snapshot', snapshot_id)
diff --git a/cinder/volume/flows/manager/manage_existing_snapshot.py b/cinder/volume/flows/manager/manage_existing_snapshot.py
new file mode 100644 (file)
index 0000000..96bc49b
--- /dev/null
@@ -0,0 +1,344 @@
+#   Copyright (c) 2015 Huawei Technologies Co., Ltd.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License"); you may
+#   not use this file except in compliance with the License. You may obtain
+#   a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#   License for the specific language governing permissions and limitations
+#   under the License.
+
+from oslo_config import cfg
+from oslo_log import log as logging
+import taskflow.engines
+from taskflow.patterns import linear_flow
+from taskflow.types import failure as ft
+from taskflow.utils import misc
+
+from cinder import exception
+from cinder import flow_utils
+from cinder.i18n import _, _LE, _LI
+from cinder import objects
+from cinder import quota
+from cinder.volume.flows import common as flow_common
+from cinder.volume import utils as volume_utils
+
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+QUOTAS = quota.QUOTAS
+
+ACTION = 'snapshot:manage_existing'
+
+
+class ExtractSnapshotRefTask(flow_utils.CinderTask):
+    """Extracts snapshot reference for given snapshot id."""
+
+    default_provides = 'snapshot_ref'
+
+    def __init__(self, db):
+        super(ExtractSnapshotRefTask, self).__init__(addons=[ACTION])
+        self.db = db
+
+    def execute(self, context, snapshot_id):
+        # NOTE(wanghao): this will fetch the snapshot from the database, if
+        # the snapshot has been deleted before we got here then this should
+        # fail.
+        #
+        # In the future we might want to have a lock on the snapshot_id so that
+        # the snapshot can not be deleted while its still being created?
+        snapshot_ref = objects.Snapshot.get_by_id(context, snapshot_id)
+        LOG.debug("ExtractSnapshotRefTask return"
+                  " snapshot_ref: %s", snapshot_ref)
+        return snapshot_ref
+
+    def revert(self, context, snapshot_id, result, **kwargs):
+        if isinstance(result, misc.Failure):
+            return
+
+        flow_common.error_out_snapshot(context, self.db, snapshot_id)
+        LOG.error(_LE("Snapshot %s: create failed"), snapshot_id)
+
+
+class NotifySnapshotActionTask(flow_utils.CinderTask):
+    """Performs a notification about the given snapshot when called.
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, db, event_suffix, host):
+        super(NotifySnapshotActionTask, self).__init__(addons=[ACTION,
+                                                               event_suffix])
+        self.db = db
+        self.event_suffix = event_suffix
+        self.host = host
+
+    def execute(self, context, snapshot_ref):
+        snapshot_id = snapshot_ref['id']
+        try:
+            volume_utils.notify_about_snapshot_usage(context, snapshot_ref,
+                                                     self.event_suffix,
+                                                     host=self.host)
+        except exception.CinderException:
+            # If notification sending of snapshot database entry reading fails
+            # then we shouldn't error out the whole workflow since this is
+            # not always information that must be sent for snapshots to operate
+            LOG.exception(_LE("Failed notifying about the snapshot "
+                              "action %(event)s for snapshot %(snp_id)s."),
+                          {'event': self.event_suffix,
+                           'snp_id': snapshot_id})
+
+
+class PrepareForQuotaReservationTask(flow_utils.CinderTask):
+    """Gets the snapshot size from the driver."""
+
+    default_provides = set(['size', 'snapshot_properties'])
+
+    def __init__(self, db, driver):
+        super(PrepareForQuotaReservationTask, self).__init__(addons=[ACTION])
+        self.db = db
+        self.driver = driver
+
+    def execute(self, context, snapshot_ref, manage_existing_ref):
+        snapshot_id = snapshot_ref['id']
+        if not self.driver.initialized:
+            driver_name = (self.driver.configuration.
+                           safe_get('volume_backend_name'))
+            LOG.error(_LE("Unable to manage existing snapshot. "
+                          "Volume driver %s not initialized."), driver_name)
+            flow_common.error_out_snapshot(context, self.db, snapshot_id,
+                                           reason=_("Volume driver %s "
+                                                    "not initialized.") %
+                                           driver_name)
+            raise exception.DriverNotInitialized()
+
+        size = self.driver.manage_existing_snapshot_get_size(
+            snapshot=snapshot_ref,
+            existing_ref=manage_existing_ref)
+
+        return {'size': size,
+                'snapshot_properties': snapshot_ref}
+
+
+class QuotaReserveTask(flow_utils.CinderTask):
+    """Reserves a single snapshot with the given size.
+
+    Reversion strategy: rollback the quota reservation.
+
+    Warning Warning: if the process that is running this reserve and commit
+    process fails (or is killed before the quota is rolled back or committed
+    it does appear like the quota will never be rolled back). This makes
+    software upgrades hard (inflight operations will need to be stopped or
+    allowed to complete before the upgrade can occur). *In the future* when
+    taskflow has persistence built-in this should be easier to correct via
+    an automated or manual process.
+    """
+
+    default_provides = set(['reservations'])
+
+    def __init__(self):
+        super(QuotaReserveTask, self).__init__(addons=[ACTION])
+
+    def execute(self, context, size, optional_args):
+        try:
+            if CONF.no_snapshot_gb_quota:
+                reserve_opts = {'snapshots': 1}
+            else:
+                reserve_opts = {'snapshots': 1, 'gigabytes': size}
+            reservations = QUOTAS.reserve(context, **reserve_opts)
+            return {
+                'reservations': reservations,
+            }
+        except exception.OverQuota as e:
+            overs = e.kwargs['overs']
+            quotas = e.kwargs['quotas']
+            usages = e.kwargs['usages']
+            volume_utils.process_reserve_over_quota(context, overs, usages,
+                                                    quotas, size)
+
+    def revert(self, context, result, optional_args, **kwargs):
+        # We never produced a result and therefore can't destroy anything.
+        if isinstance(result, misc.Failure):
+            return
+
+        if optional_args['is_quota_committed']:
+            # The reservations have already been committed and can not be
+            # rolled back at this point.
+            return
+        # We actually produced an output that we can revert so lets attempt
+        # to use said output to rollback the reservation.
+        reservations = result['reservations']
+        try:
+            QUOTAS.rollback(context, reservations)
+        except exception.CinderException:
+            # We are already reverting, therefore we should silence this
+            # exception since a second exception being active will be bad.
+            LOG.exception(_LE("Failed rolling back quota for"
+                              " %s reservations."), reservations)
+
+
+class QuotaCommitTask(flow_utils.CinderTask):
+    """Commits the reservation.
+
+    Reversion strategy: N/A (the rollback will be handled by the task that did
+    the initial reservation (see: QuotaReserveTask).
+
+    Warning Warning: if the process that is running this reserve and commit
+    process fails (or is killed before the quota is rolled back or committed
+    it does appear like the quota will never be rolled back). This makes
+    software upgrades hard (inflight operations will need to be stopped or
+    allowed to complete before the upgrade can occur). *In the future* when
+    taskflow has persistence built-in this should be easier to correct via
+    an automated or manual process.
+    """
+
+    def __init__(self):
+        super(QuotaCommitTask, self).__init__(addons=[ACTION])
+
+    def execute(self, context, reservations, snapshot_properties,
+                optional_args):
+        QUOTAS.commit(context, reservations)
+        # updating is_quota_committed attribute of optional_args dictionary
+        optional_args['is_quota_committed'] = True
+        return {'snapshot_properties': snapshot_properties}
+
+    def revert(self, context, result, **kwargs):
+        # We never produced a result and therefore can't destroy anything.
+        if isinstance(result, ft.Failure):
+            return
+        snapshot = result['snapshot_properties']
+        try:
+            reserve_opts = {'snapshots': -1,
+                            'gigabytes': -snapshot['volume_size']}
+            reservations = QUOTAS.reserve(context,
+                                          project_id=context.project_id,
+                                          **reserve_opts)
+            if reservations:
+                QUOTAS.commit(context, reservations,
+                              project_id=context.project_id)
+        except Exception:
+            LOG.exception(_LE("Failed to update quota while deleting "
+                              "snapshots: %s"), snapshot['id'])
+
+
+class ManageExistingTask(flow_utils.CinderTask):
+    """Brings an existing snapshot under Cinder management."""
+
+    default_provides = set(['snapshot', 'new_status'])
+
+    def __init__(self, db, driver):
+        super(ManageExistingTask, self).__init__(addons=[ACTION])
+        self.db = db
+        self.driver = driver
+
+    def execute(self, context, snapshot_ref, manage_existing_ref, size):
+        model_update = self.driver.manage_existing_snapshot(
+            snapshot=snapshot_ref,
+            existing_ref=manage_existing_ref)
+        if not model_update:
+            model_update = {}
+        model_update.update({'size': size})
+        try:
+            snapshot_object = objects.Snapshot.get_by_id(context,
+                                                         snapshot_ref['id'])
+            snapshot_object.update(model_update)
+            snapshot_object.save()
+        except exception.CinderException:
+            LOG.exception(_LE("Failed updating model of snapshot "
+                              "%(snapshot_id)s with creation provided model "
+                              "%(model)s."),
+                          {'snapshot_id': snapshot_ref['id'],
+                           'model': model_update})
+            raise
+
+        return {'snapshot': snapshot_ref,
+                'new_status': 'available'}
+
+
+class CreateSnapshotOnFinishTask(NotifySnapshotActionTask):
+    """Perform final snapshot actions.
+
+    When a snapshot is created successfully it is expected that MQ
+    notifications and database updates will occur to 'signal' to others that
+    the snapshot is now ready for usage. This task does those notifications and
+    updates in a reliable manner (not re-raising exceptions if said actions can
+    not be triggered).
+
+    Reversion strategy: N/A
+    """
+
+    def __init__(self, db, event_suffix, host):
+        super(CreateSnapshotOnFinishTask, self).__init__(db, event_suffix,
+                                                         host)
+
+    def execute(self, context, snapshot, new_status):
+        LOG.debug("Begin to call CreateSnapshotOnFinishTask execute.")
+        snapshot_id = snapshot['id']
+        LOG.debug("New status: %s", new_status)
+        update = {
+            'status': new_status
+        }
+        try:
+            # TODO(harlowja): is it acceptable to only log if this fails??
+            # or are there other side-effects that this will cause if the
+            # status isn't updated correctly (aka it will likely be stuck in
+            # 'building' if this fails)??
+            snapshot_object = objects.Snapshot.get_by_id(context,
+                                                         snapshot_id)
+            snapshot_object.update(update)
+            snapshot_object.save()
+            # Now use the parent to notify.
+            super(CreateSnapshotOnFinishTask, self).execute(context, snapshot)
+        except exception.CinderException:
+            LOG.exception(_LE("Failed updating snapshot %(snapshot_id)s with "
+                              "%(update)s."), {'snapshot_id': snapshot_id,
+                                               'update': update})
+        # Even if the update fails, the snapshot is ready.
+        LOG.info(_LI("Snapshot %s created successfully."), snapshot_id)
+
+
+def get_flow(context, db, driver, host, snapshot_id, ref):
+    """Constructs and returns the manager entry point flow."""
+
+    LOG.debug("Input parmeter: context=%(context)s, db=%(db)s,"
+              "driver=%(driver)s, host=%(host)s, "
+              "snapshot_id=(snapshot_id)s, ref=%(ref)s.",
+              {'context': context,
+               'db': db,
+               'driver': driver,
+               'host': host,
+               'snapshot_id': snapshot_id,
+               'ref': ref}
+              )
+    flow_name = ACTION.replace(":", "_") + "_manager"
+    snapshot_flow = linear_flow.Flow(flow_name)
+
+    # This injects the initial starting flow values into the workflow so that
+    # the dependency order of the tasks provides/requires can be correctly
+    # determined.
+    create_what = {
+        'context': context,
+        'snapshot_id': snapshot_id,
+        'manage_existing_ref': ref,
+        'optional_args': {'is_quota_committed': False}
+    }
+
+    notify_start_msg = "manage_existing_snapshot.start"
+    notify_end_msg = "manage_existing_snapshot.end"
+    snapshot_flow.add(ExtractSnapshotRefTask(db),
+                      NotifySnapshotActionTask(db, notify_start_msg,
+                                               host=host),
+                      PrepareForQuotaReservationTask(db, driver),
+                      QuotaReserveTask(),
+                      ManageExistingTask(db, driver),
+                      QuotaCommitTask(),
+                      CreateSnapshotOnFinishTask(db, notify_end_msg,
+                                                 host=host))
+    LOG.debug("Begin to return taskflow.engines."
+              "load(snapshot_flow,store=create_what).")
+    # Now load (but do not run) the flow using the provided initial data.
+    return taskflow.engines.load(snapshot_flow, store=create_what)
index cbbe984e3b517245c7031ab677c73a49aaefd303..3f571c5fcc629460616b559be5ea1747405ddda0 100644 (file)
@@ -65,6 +65,7 @@ from cinder import utils
 from cinder.volume import configuration as config
 from cinder.volume.flows.manager import create_volume
 from cinder.volume.flows.manager import manage_existing
+from cinder.volume.flows.manager import manage_existing_snapshot
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as vol_utils
 from cinder.volume import volume_types
@@ -189,7 +190,7 @@ def locked_snapshot_operation(f):
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
 
-    RPC_API_VERSION = '1.27'
+    RPC_API_VERSION = '1.28'
 
     target = messaging.Target(version=RPC_API_VERSION)
 
@@ -722,7 +723,7 @@ class VolumeManager(manager.SchedulerDependentManager):
         return snapshot.id
 
     @locked_snapshot_operation
-    def delete_snapshot(self, context, snapshot):
+    def delete_snapshot(self, context, snapshot, unmanage_only=False):
         """Deletes and unexports snapshot."""
         context = context.elevated()
         snapshot._context = context
@@ -742,7 +743,10 @@ class VolumeManager(manager.SchedulerDependentManager):
             snapshot.context = context
             snapshot.save()
 
-            self.driver.delete_snapshot(snapshot)
+            if unmanage_only:
+                self.driver.unmanage_snapshot(snapshot)
+            else:
+                self.driver.delete_snapshot(snapshot)
         except exception.SnapshotIsBusy:
             LOG.error(_LE("Delete snapshot failed, due to snapshot busy."),
                       resource=snapshot)
@@ -3040,3 +3044,25 @@ class VolumeManager(manager.SchedulerDependentManager):
             raise exception.VolumeBackendAPIException(data=err_msg)
 
         return replication_targets
+
+    def manage_existing_snapshot(self, ctxt, snapshot, ref=None):
+        LOG.debug('manage_existing_snapshot: managing %s.', ref)
+        try:
+            flow_engine = manage_existing_snapshot.get_flow(
+                ctxt,
+                self.db,
+                self.driver,
+                self.host,
+                snapshot.id,
+                ref)
+        except Exception:
+            msg = _LE("Failed to create manage_existing flow: "
+                      "%(object_type)s %(object_id)s.")
+            LOG.exception(msg, {'object_type': 'snapshot',
+                                'object_id': snapshot.id})
+            raise exception.CinderException(
+                _("Failed to create manage existing flow."))
+
+        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
+            flow_engine.run()
+        return snapshot.id
index ab3c171803b2d36e89e2d17249b6953b7c41c305..f1b53c7113518ce859dfaa9638267dddb051f893 100644 (file)
@@ -73,6 +73,7 @@ class VolumeAPI(object):
                create_consistencygroup(), create_consistencygroup_from_src(),
                update_consistencygroup() and delete_consistencygroup().
         1.27 - Adds support for replication V2
+        1.28 - Adds manage_existing_snapshot
     """
 
     BASE_RPC_API_VERSION = '1.0'
@@ -82,7 +83,7 @@ class VolumeAPI(object):
         target = messaging.Target(topic=CONF.volume_topic,
                                   version=self.BASE_RPC_API_VERSION)
         serializer = objects_base.CinderObjectSerializer()
-        self.client = rpc.get_client(target, '1.27', serializer=serializer)
+        self.client = rpc.get_client(target, '1.28', serializer=serializer)
 
     def create_consistencygroup(self, ctxt, group, host):
         new_host = utils.extract_host(host)
@@ -152,10 +153,11 @@ class VolumeAPI(object):
         cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'],
                    snapshot=snapshot)
 
-    def delete_snapshot(self, ctxt, snapshot, host):
+    def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
         new_host = utils.extract_host(host)
         cctxt = self.client.prepare(server=new_host)
-        cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot)
+        cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
+                   unmanage_only=unmanage_only)
 
     def attach_volume(self, ctxt, volume, instance_uuid, host_name,
                       mountpoint, mode):
@@ -287,3 +289,9 @@ class VolumeAPI(object):
         new_host = utils.extract_host(volume['host'])
         cctxt = self.client.prepare(server=new_host, version='1.27')
         return cctxt.call(ctxt, 'list_replication_targets', volume=volume)
+
+    def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
+        cctxt = self.client.prepare(server=host, version='1.28')
+        cctxt.cast(ctxt, 'manage_existing_snapshot',
+                   snapshot=snapshot,
+                   ref=ref)
index 893cf3bef8b53bcf0f2843be8e9fb94d1f445459..e8d6f53c0d80311bf33053243c7608ed000d5d0a 100644 (file)
@@ -594,3 +594,29 @@ def convert_config_string_to_dict(config_string):
                     {'config_string': config_string})
 
     return resultant_dict
+
+
+def process_reserve_over_quota(context, overs, usages, quotas, size):
+    def _consumed(name):
+        return (usages[name]['reserved'] + usages[name]['in_use'])
+
+    for over in overs:
+        if 'gigabytes' in over:
+            msg = _LW("Quota exceeded for %(s_pid)s, tried to create "
+                      "%(s_size)sG snapshot (%(d_consumed)dG of "
+                      "%(d_quota)dG already consumed).")
+            LOG.warning(msg, {'s_pid': context.project_id,
+                              's_size': size,
+                              'd_consumed': _consumed(over),
+                              'd_quota': quotas[over]})
+            raise exception.VolumeSizeExceedsAvailableQuota(
+                requested=size,
+                consumed=_consumed('gigabytes'),
+                quota=quotas['gigabytes'])
+        elif 'snapshots' in over:
+            msg = _LW("Quota exceeded for %(s_pid)s, tried to create "
+                      "snapshot (%(d_consumed)d snapshots "
+                      "already consumed).")
+            LOG.warning(msg, {'s_pid': context.project_id,
+                              'd_consumed': _consumed(over)})
+            raise exception.SnapshotLimitExceeded(allowed=quotas[over])
index 7bbe497537e48a27673d8b4d2a2510c83578b87a..efe3c0488a895b7815e76e6d143957a07fcd8821 100644 (file)
@@ -78,6 +78,8 @@
     "backup:backup-export": "rule:admin_api",
 
     "snapshot_extension:snapshot_actions:update_snapshot_status": "",
+    "snapshot_extension:snapshot_manage": "rule:admin_api",
+    "snapshot_extension:snapshot_unmanage": "rule:admin_api",
 
     "consistencygroup:create" : "group:nobody",
     "consistencygroup:delete": "group:nobody",