This is take #2 for managing replicaiton in Cinder.
This patch provides the foundation in Cinder to make volume
replication available to the cloud admin. It makes Cinder aware
of volume replicas, and allows the cloud admin to define storage
policies (volume types) that will enable replication.
In this version Cinder delegates most the work on replication
to the driver itself.
This includes:
1. Driver exposes replication capabilities via volume type convention.
2. Extend volume table to include columns to support replicaion.
3. Create replicas in the driver, making it transparant to Cinder.
4. Volume manager code to handle API, updates to create_volume to
support creating test replicas.
5. Driver methods to expose per replication functions
Cinder-specs available at https://review.openstack.org/#/c/98308/
Volume replication use-case: Simplified disaster recovery
The OpenStack cloud is deployed across two metro distance data centers.
Storage backends are available in both data ceneters. The backends
are managed by either a single Cinder host or two, depending on the
storage backend requirements.
Storage admin configures the Cinder volume driver to support
replication.
Cloud admin creates a volume type "replicated" with extra-specs:
capabilities:replication="<is> True"
Every volume created in type "replicated" has a copy on both
backends.
In case of data center failure in first data center, the cloud admin
promotes the replica, and redeploy the VMs - they will now run on
a host in the secondary data center using the storage on the
secondary data center.
Implements: blueprint volume-replication
DocImpact
Change-Id: I964852f08b500400a27bff99e5200386e00643c9
--- /dev/null
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import webob
+from webob import exc
+
+from cinder.api import extensions
+from cinder.api.openstack import wsgi
+from cinder.api import xmlutil
+from cinder import exception
+from cinder.i18n import _
+from cinder.openstack.common import log as logging
+from cinder import replication as replicationAPI
+from cinder import volume
+
+LOG = logging.getLogger(__name__)
+
+authorize = extensions.soft_extension_authorizer('volume',
+ 'volume_replication')
+
+
+class VolumeReplicationController(wsgi.Controller):
+ """The Volume Replication API controller for the Openstack API."""
+
+ def __init__(self, *args, **kwargs):
+ super(VolumeReplicationController, self).__init__(*args, **kwargs)
+ self.volume_api = volume.API()
+ self.replication_api = replicationAPI.API()
+
+ def _add_replication_attributes(self, req, context, resp_volume):
+ db_volume = req.cached_resource_by_id(resp_volume['id'])
+ key = "%s:extended_status" % Volume_replication.alias
+ resp_volume[key] = db_volume['replication_extended_status']
+ key = "%s:driver_data" % Volume_replication.alias
+ resp_volume[key] = db_volume['replication_driver_data']
+
+ @wsgi.extends
+ def show(self, req, resp_obj, id):
+ context = req.environ['cinder.context']
+ if authorize(context):
+ resp_obj.attach(xml=VolumeReplicationAttributeTemplate())
+ self._add_replication_attributes(req, context,
+ resp_obj.obj['volume'])
+
+ @wsgi.extends
+ def detail(self, req, resp_obj):
+ context = req.environ['cinder.context']
+ if authorize(context):
+ resp_obj.attach(xml=VolumeReplicationListAttributeTemplate())
+ for vol in list(resp_obj.obj['volumes']):
+ self._add_replication_attributes(req, context, vol)
+
+ @wsgi.response(202)
+ @wsgi.action('os-promote-replica')
+ def promote(self, req, id, body):
+ context = req.environ['cinder.context']
+ try:
+ vol = self.volume_api.get(context, id)
+ LOG.info(_('Attempting to promote secondary replica to primary'
+ ' for volume %s.'),
+ str(id),
+ context=context)
+ self.replication_api.promote(context, vol)
+ except exception.NotFound:
+ msg = _("Volume could not be found")
+ raise exc.HTTPNotFound(explanation=msg)
+ except exception.ReplicationError as error:
+ raise exc.HTTPBadRequest(explanation=unicode(error))
+ return webob.Response(status_int=202)
+
+ @wsgi.response(202)
+ @wsgi.action('os-reenable-replica')
+ def reenable(self, req, id, body):
+ context = req.environ['cinder.context']
+ try:
+ vol = self.volume_api.get(context, id)
+ LOG.info(_('Attempting to sync secondary replica with primary'
+ ' for volume %s.'),
+ str(id),
+ context=context)
+ self.replication_api.reenable(context, vol)
+ except exception.NotFound:
+ msg = _("Volume could not be found")
+ raise exc.HTTPNotFound(explanation=msg)
+ except exception.ReplicationError as error:
+ raise exc.HTTPBadRequest(explanation=unicode(error))
+ return webob.Response(status_int=202)
+
+
+class Volume_replication(extensions.ExtensionDescriptor):
+ """Volume replication management support."""
+
+ name = "VolumeReplication"
+ alias = "os-volume-replication"
+ namespace = "http://docs.openstack.org/volume/ext/volume_replication/" + \
+ "api/v1"
+ updated = "2014-08-01T00:00:00+00:00"
+
+ def get_controller_extensions(self):
+ controller = VolumeReplicationController()
+ extension = extensions.ControllerExtension(self, 'volumes', controller)
+ return [extension]
+
+
+def make_volume(elem):
+ elem.set('{%s}extended_status' % Volume_replication.namespace,
+ '%s:extended_status' % Volume_replication.alias)
+ elem.set('{%s}driver_data' % Volume_replication.namespace,
+ '%s:driver_data' % Volume_replication.alias)
+
+
+class VolumeReplicationAttributeTemplate(xmlutil.TemplateBuilder):
+ def construct(self):
+ root = xmlutil.TemplateElement('volume', selector='volume')
+ make_volume(root)
+ alias = Volume_replication.alias
+ namespace = Volume_replication.namespace
+ return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace})
+
+
+class VolumeReplicationListAttributeTemplate(xmlutil.TemplateBuilder):
+ def construct(self):
+ root = xmlutil.TemplateElement('volumes')
+ elem = xmlutil.SubTemplateElement(root, 'volume', selector='volumes')
+ make_volume(elem)
+ alias = Volume_replication.alias
+ namespace = Volume_replication.namespace
+ return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace})
'links': self._get_links(request, volume['id']),
'user_id': volume.get('user_id'),
'bootable': str(volume.get('bootable')).lower(),
- 'encrypted': self._is_volume_encrypted(volume)
+ 'encrypted': self._is_volume_encrypted(volume),
+ 'replication_status': volume.get('replication_status')
}
}
else:
kwargs['source_volume'] = None
+ source_replica = volume.get('source_replica')
+ if source_replica is not None:
+ try:
+ src_vol = self.volume_api.get_volume(context,
+ source_replica)
+ if src_vol['replication_status'] == 'disabled':
+ explanation = _('source volume id:%s is not'
+ ' replicated') % source_volid
+ raise exc.HTTPNotFound(explanation=explanation)
+ kwargs['source_replica'] = src_vol
+ except exception.NotFound:
+ explanation = (_('replica source volume id:%s not found') %
+ source_replica)
+ raise exc.HTTPNotFound(explanation=explanation)
+ else:
+ kwargs['source_replica'] = None
+
size = volume.get('size', None)
if size is None and kwargs['snapshot'] is not None:
size = kwargs['snapshot']['volume_size']
elif size is None and kwargs['source_volume'] is not None:
size = kwargs['source_volume']['size']
+ elif size is None and kwargs['source_replica'] is not None:
+ size = kwargs['source_replica']['size']
LOG.info(_("Create volume of %s GB"), size, context=context)
help='Whether snapshots count against GigaByte quota'),
cfg.StrOpt('transfer_api_class',
default='cinder.transfer.api.API',
- help='The full class name of the volume transfer API class'), ]
+ help='The full class name of the volume transfer API class'),
+ cfg.StrOpt('replication_api_class',
+ default='cinder.replication.api.API',
+ help='The full class name of the volume replication API class'), ]
CONF.register_opts(global_opts)
--- /dev/null
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Column
+from sqlalchemy import MetaData, String, Table
+
+from cinder.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def upgrade(migrate_engine):
+ """Add replication columns to volumes."""
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ volumes = Table('volumes', meta, autoload=True)
+ replication_status = Column('replication_status', String(255))
+ replication_extended_status = Column('replication_extended_status',
+ String(255))
+ replication_driver_data = Column('replication_driver_data', String(255))
+ volumes.create_column(replication_status)
+ volumes.create_column(replication_extended_status)
+ volumes.create_column(replication_driver_data)
+ volumes.update().values(replication_status='disabled',
+ replication_extended_status=None,
+ replication_driver_data=None).execute()
+
+
+def downgrade(migrate_engine):
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ volumes = Table('volumes', meta, autoload=True)
+ replication_status = volumes.columns.replication_status
+ replication_extended_status = volumes.columns.replication_extended_status
+ replication_driver_data = volumes.columns.replication_driver_data
+ volumes.drop_column(replication_status)
+ volumes.drop_column(replication_extended_status)
+ volumes.drop_column(replication_driver_data)
deleted = Column(Boolean, default=False)
bootable = Column(Boolean, default=False)
+ replication_status = Column(String(255))
+ replication_extended_status = Column(String(255))
+ replication_driver_data = Column(String(255))
+
class VolumeMetadata(BASE, CinderBase):
"""Represents a metadata key/value pair for a volume."""
"reference %(existing_ref)s: %(reason)s")
+class ReplicationError(CinderException):
+ message = _("Volume %(volume_id)s replication "
+ "error: %(reason)s")
+
+
+class ReplicationNotFound(NotFound):
+ message = _("Volume replication for %(volume_id)s "
+ "could not be found.")
+
+
class ManageExistingVolumeTypeMismatch(CinderException):
message = _("Manage existing volume failed due to volume type mismatch: "
"%(reason)s")
--- /dev/null
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from oslo.config import cfg
+
+import cinder.openstack.common.importutils
+
+
+CONF = cfg.CONF
+
+cls = CONF.replication_api_class
+API = cinder.openstack.common.importutils.import_class(cls)
--- /dev/null
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Handles all requests relating to volume replication.
+"""
+import functools
+
+from oslo.config import cfg
+
+from cinder.db import base
+from cinder import exception
+from cinder.i18n import _
+from cinder.openstack.common import log as logging
+from cinder import policy
+from cinder import volume as cinder_volume
+from cinder.volume import rpcapi as volume_rpcapi
+from cinder.volume import utils as volume_utils
+
+CONF = cfg.CONF
+
+LOG = logging.getLogger(__name__)
+
+PROMOTE_PROCEED_STATUS = ('active', 'active-stopped')
+REENABLE_PROCEED_STATUS = ('inactive', 'active-stopped', 'error')
+
+
+def wrap_check_policy(func):
+ """Check policy corresponding to the wrapped methods prior to execution.
+
+ This decorator requires the first 3 args of the wrapped function
+ to be (self, context, relationship_id)
+ """
+ @functools.wraps(func)
+ def wrapped(self, context, target_obj, *args, **kwargs):
+ check_policy(context, func.__name__, target_obj)
+ return func(self, context, target_obj, *args, **kwargs)
+ return wrapped
+
+
+def check_policy(context, action, target_obj=None):
+ target = {
+ 'project_id': context.project_id,
+ 'user_id': context.user_id,
+ }
+ target.update(target_obj or {})
+ _action = 'volume_extension:replication:%s' % action
+ policy.enforce(context, _action, target)
+
+
+class API(base.Base):
+ """API for interacting with volume replication relationships."""
+
+ def __init__(self, db_driver=None):
+ super(API, self).__init__(db_driver)
+ self.volume_rpcapi = volume_rpcapi.VolumeAPI()
+ self.volume_api = cinder_volume.API()
+
+ @wrap_check_policy
+ def promote(self, context, vol):
+ if vol['replication_status'] == 'disabled':
+ msg = _("Replication is not enabled for volume")
+ raise exception.ReplicationError(
+ reason=msg,
+ volume_id=vol['id'])
+ if vol['replication_status'] not in PROMOTE_PROCEED_STATUS:
+ msg = _("Replication status for volume must be active or "
+ "active-stopped, but current status "
+ "is: %s") % vol['replication_status']
+ raise exception.ReplicationError(
+ reason=msg,
+ volume_id=vol['id'])
+
+ if vol['status'] != 'available':
+ msg = _("Volume status for volume must be available, but current "
+ "status is: %s") % vol['status']
+ raise exception.ReplicationError(
+ reason=msg,
+ volume_id=vol['id'])
+ volume_utils.notify_about_replication_usage(context,
+ vol,
+ 'promote')
+ self.volume_rpcapi.promote_replica(context, vol)
+
+ @wrap_check_policy
+ def reenable(self, context, vol):
+ if vol['replication_status'] == 'disabled':
+ msg = _("Replication is not enabled")
+ raise exception.ReplicationError(
+ reason=msg,
+ volume_id=vol['id'])
+ if vol['replication_status'] not in REENABLE_PROCEED_STATUS:
+ msg = _("Replication status for volume must be inactive,"
+ " active-stopped, or error, but current status "
+ "is: %s") % vol['replication_status']
+ raise exception.ReplicationError(
+ reason=msg,
+ volume_id=vol['id'])
+
+ volume_utils.notify_about_replication_usage(context,
+ vol,
+ 'sync')
+ self.volume_rpcapi.reenable_replication(context, vol)
volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
self.assertEqual(volume['migration_status'], 'starting')
+ def test_migrate_volume_fail_replication(self):
+ expected_status = 400
+ host = 'test2'
+ ctx = context.RequestContext('admin', 'fake', True)
+ volume = self._migrate_volume_prep()
+ # current status is available
+ volume = db.volume_create(ctx,
+ {'status': 'available',
+ 'host': 'test',
+ 'provider_location': '',
+ 'attach_status': '',
+ 'replication_status': 'active'})
+ volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
+
def test_migrate_volume_as_non_admin(self):
expected_status = 403
host = 'test2'
--- /dev/null
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for volume replication API code.
+"""
+
+import json
+
+import mock
+from oslo.config import cfg
+import webob
+
+from cinder import context
+from cinder import test
+from cinder.tests.api import fakes
+from cinder.tests import utils as tests_utils
+
+CONF = cfg.CONF
+
+
+def app():
+ # no auth, just let environ['cinder.context'] pass through
+ api = fakes.router.APIRouter()
+ mapper = fakes.urlmap.URLMap()
+ mapper['/v2'] = api
+ return mapper
+
+
+class VolumeReplicationAPITestCase(test.TestCase):
+ """Test Cases for replication API."""
+
+ def setUp(self):
+ super(VolumeReplicationAPITestCase, self).setUp()
+ self.ctxt = context.RequestContext('admin', 'fake', True)
+ self.volume_params = {
+ 'host': CONF.host,
+ 'size': 1}
+
+ def _get_resp(self, operation, volume_id, xml=False):
+ """Helper for a replication action req for the specified volume_id."""
+ req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume_id)
+ req.method = 'POST'
+ if xml:
+ body = '<os-%s-replica/>' % operation
+ req.headers['Content-Type'] = 'application/xml'
+ req.headers['Accept'] = 'application/xml'
+ req.body = body
+ else:
+ body = {'os-%s-replica' % operation: ''}
+ req.headers['Content-Type'] = 'application/json'
+ req.body = json.dumps(body)
+ req.environ['cinder.context'] = context.RequestContext('admin',
+ 'fake',
+ True)
+ res = req.get_response(app())
+ return req, res
+
+ def test_promote_bad_id(self):
+ (req, res) = self._get_resp('promote', 'fake')
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 404, msg)
+
+ def test_promote_bad_id_xml(self):
+ (req, res) = self._get_resp('promote', 'fake', xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 404, msg)
+
+ def test_promote_volume_not_replicated(self):
+ volume = tests_utils.create_volume(
+ self.ctxt,
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'])
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ def test_promote_volume_not_replicated_xml(self):
+ volume = tests_utils.create_volume(
+ self.ctxt,
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'], xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
+ def test_promote_replication_volume_status(self,
+ _rpcapi_promote):
+ for status in ['error', 'in-use']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = status,
+ replication_status = 'active',
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'])
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ for status in ['available']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = status,
+ replication_status = 'active',
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'])
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 202, msg)
+
+ @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
+ def test_promote_replication_volume_status_xml(self,
+ _rpcapi_promote):
+ for status in ['error', 'in-use']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = status,
+ replication_status = 'active',
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'], xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ for status in ['available']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = status,
+ replication_status = 'active',
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'], xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 202, msg)
+
+ @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
+ def test_promote_replication_replication_status(self,
+ _rpcapi_promote):
+ for status in ['error', 'copying', 'inactive']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = 'available',
+ replication_status = status,
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'])
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ for status in ['active', 'active-stopped']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = 'available',
+ replication_status = status,
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'])
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 202, msg)
+
+ @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
+ def test_promote_replication_replication_status_xml(self,
+ _rpcapi_promote):
+ for status in ['error', 'copying', 'inactive']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = 'available',
+ replication_status = status,
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'], xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ for status in ['active', 'active-stopped']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = 'available',
+ replication_status = status,
+ **self.volume_params)
+ (req, res) = self._get_resp('promote', volume['id'], xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 202, msg)
+
+ def test_reenable_bad_id(self):
+ (req, res) = self._get_resp('reenable', 'fake')
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 404, msg)
+
+ def test_reenable_bad_id_xml(self):
+ (req, res) = self._get_resp('reenable', 'fake', xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 404, msg)
+
+ def test_reenable_volume_not_replicated(self):
+ volume = tests_utils.create_volume(
+ self.ctxt,
+ **self.volume_params)
+ (req, res) = self._get_resp('reenable', volume['id'])
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ def test_reenable_volume_not_replicated_xml(self):
+ volume = tests_utils.create_volume(
+ self.ctxt,
+ **self.volume_params)
+ (req, res) = self._get_resp('reenable', volume['id'], xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ @mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
+ def test_reenable_replication_replication_status(self,
+ _rpcapi_promote):
+ for status in ['active', 'copying']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = 'available',
+ replication_status = status,
+ **self.volume_params)
+ (req, res) = self._get_resp('reenable', volume['id'])
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ for status in ['inactive', 'active-stopped', 'error']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = 'available',
+ replication_status = status,
+ **self.volume_params)
+ (req, res) = self._get_resp('reenable', volume['id'])
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 202, msg)
+
+ @mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
+ def test_reenable_replication_replication_status_xml(self,
+ _rpcapi_promote):
+ for status in ['active', 'copying']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = 'available',
+ replication_status = status,
+ **self.volume_params)
+ (req, res) = self._get_resp('reenable', volume['id'], xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 400, msg)
+
+ for status in ['inactive', 'active-stopped', 'error']:
+ volume = tests_utils.create_volume(self.ctxt,
+ status = 'available',
+ replication_status = status,
+ **self.volume_params)
+ (req, res) = self._get_resp('reenable', volume['id'], xml=True)
+ msg = ("request: %s\nresult: %s" % (req, res))
+ self.assertEqual(res.status_int, 202, msg)
{'key': 'readonly', 'value': 'False'}],
'bootable': False,
'launched_at': datetime.datetime(1, 1, 1, 1, 1, 1),
- 'volume_type': {'name': 'vol_type_name'}}
+ 'volume_type': {'name': 'vol_type_name'},
+ 'replication_status': 'disabled',
+ 'replication_extended_status': None,
+ 'replication_driver_data': None}
volume.update(kwargs)
if kwargs.get('volume_glance_metadata', None):
'rel': 'bookmark'}],
'metadata': {},
'name': 'Volume Test Name',
+ 'replication_status': 'disabled',
'size': 100,
'snapshot_id': None,
'source_volid': None,
'rel': 'bookmark'}],
'metadata': {},
'name': 'Volume Test Name',
+ 'replication_status': 'disabled',
'size': '1',
'snapshot_id': None,
'source_volid': None,
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'Updated Test Name',
+ 'replication_status': 'disabled',
'attachments': [
{
'id': '1',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'Updated Test Name',
+ 'replication_status': 'disabled',
'attachments': [
{
'id': '1',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'New Name',
+ 'replication_status': 'disabled',
'attachments': [
{
'id': '1',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
+ 'replication_status': 'disabled',
'attachments': [{
'id': '1',
'volume_id': '1',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'Updated Test Name',
+ 'replication_status': 'disabled',
'attachments': [{
'id': '1',
'volume_id': '1',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
+ 'replication_status': 'disabled',
'attachments': [
{
'device': '/',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
+ 'replication_status': 'disabled',
'attachments': [
{
'device': '/',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
+ 'replication_status': 'disabled',
'attachments': [
{
'device': '/',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
+ 'replication_status': 'disabled',
'attachments': [],
'user_id': 'fakeuser',
'volume_type': 'vol_type_name',
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
+ 'replication_status': 'disabled',
'attachments': [
{
'device': '/',
"backup:get_all": [],
"backup:restore": [],
"backup:backup-import": [["rule:admin_api"]],
- "backup:backup-export": [["rule:admin_api"]]
+ "backup:backup-export": [["rule:admin_api"]],
+
+ "volume_extension:replication:promote": [["rule:admin_api"]],
+ "volume_extension:replication:reenable": [["rule:admin_api"]]
}
request_spec, filter_properties,
allow_reschedule=True,
snapshot_id=None, image_id=None,
- source_volid=None):
+ source_volid=None,
+ source_replicaid=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
self.test_inst.assertEqual(request_spec['image_id'], image_id)
+ self.test_inst.assertEqual(request_spec['source_replicaid'],
+ source_replicaid)
class fake_db(object):
spec = {'volume_id': None,
'source_volid': None,
'snapshot_id': None,
- 'image_id': None}
+ 'image_id': None,
+ 'source_replicaid': None}
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),
spec = {'volume_id': 1,
'source_volid': 2,
'snapshot_id': 3,
- 'image_id': 4}
+ 'image_id': 4,
+ 'source_replicaid': 5}
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),
autoload=True)
index_names = [idx.name for idx in reservations.indexes]
self.assertNotIn('reservations_deleted_expire_idx', index_names)
+
+ def test_migration_024(self):
+ """Test adding replication columns to volume table."""
+ for (key, engine) in self.engines.items():
+ migration_api.version_control(engine,
+ TestMigrations.REPOSITORY,
+ migration.db_initial_version())
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 23)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24)
+
+ volumes = sqlalchemy.Table('volumes',
+ metadata,
+ autoload=True)
+ self.assertIsInstance(volumes.c.replication_status.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(volumes.c.replication_extended_status.type,
+ sqlalchemy.types.VARCHAR)
+ self.assertIsInstance(volumes.c.replication_driver_data.type,
+ sqlalchemy.types.VARCHAR)
+
+ migration_api.downgrade(engine, TestMigrations.REPOSITORY, 23)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ volumes = sqlalchemy.Table('volumes',
+ metadata,
+ autoload=True)
+ self.assertNotIn('replication_status', volumes.c)
+ self.assertNotIn('replication_extended_status', volumes.c)
+ self.assertNotIn('replication_driver_data', volumes.c)
--- /dev/null
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests for Volume replication code.
+"""
+
+import mock
+from oslo.config import cfg
+
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder.openstack.common import importutils
+from cinder import test
+from cinder.tests import utils as test_utils
+
+
+CONF = cfg.CONF
+
+
+class VolumeReplicationTestCase(test.TestCase):
+ def setUp(self):
+ super(VolumeReplicationTestCase, self).setUp()
+ self.ctxt = context.RequestContext('user', 'fake', False)
+ self.adm_ctxt = context.RequestContext('admin', 'fake', True)
+ self.manager = importutils.import_object(CONF.volume_manager)
+ self.manager.host = 'test_host'
+ self.manager.stats = {'allocated_capacity_gb': 0}
+ self.driver_patcher = mock.patch.object(self.manager, 'driver')
+ self.driver = self.driver_patcher.start()
+
+ @mock.patch('cinder.utils.require_driver_initialized')
+ def test_promote_replica_uninit_driver(self, _init):
+ """Test promote replication when driver is not initialized."""
+ _init.side_effect = exception.DriverNotInitialized
+ vol = test_utils.create_volume(self.ctxt,
+ status='available',
+ replication_status='active')
+ self.driver.promote_replica.return_value = None
+ self.assertRaises(exception.DriverNotInitialized,
+ self.manager.promote_replica,
+ self.adm_ctxt,
+ vol['id'])
+
+ def test_promote_replica(self):
+ """Test promote replication."""
+ vol = test_utils.create_volume(self.ctxt,
+ status='available',
+ replication_status='active')
+ self.driver.promote_replica.return_value = \
+ {'replication_status': 'inactive'}
+ self.manager.promote_replica(self.adm_ctxt, vol['id'])
+ vol_after = db.volume_get(self.ctxt, vol['id'])
+ self.assertEqual(vol_after['replication_status'], 'inactive')
+
+ def test_promote_replica_fail(self):
+ """Test promote replication when promote fails."""
+ vol = test_utils.create_volume(self.ctxt,
+ status='available',
+ replication_status='active')
+ self.driver.promote_replica.side_effect = exception.CinderException
+ self.assertRaises(exception.CinderException,
+ self.manager.promote_replica,
+ self.adm_ctxt,
+ vol['id'])
+
+ def test_reenable_replication(self):
+ """Test reenable replication."""
+ vol = test_utils.create_volume(self.ctxt,
+ status='available',
+ replication_status='error')
+ self.driver.reenable_replication.return_value = \
+ {'replication_status': 'copying'}
+ self.manager.reenable_replication(self.adm_ctxt, vol['id'])
+ vol_after = db.volume_get(self.ctxt, vol['id'])
+ self.assertEqual(vol_after['replication_status'], 'copying')
+
+ @mock.patch('cinder.utils.require_driver_initialized')
+ def test_reenable_replication_uninit_driver(self, _init):
+ """Test reenable replication when driver is not initialized."""
+ _init.side_effect = exception.DriverNotInitialized
+ vol = test_utils.create_volume(self.ctxt,
+ status='available',
+ replication_status='error')
+ self.assertRaises(exception.DriverNotInitialized,
+ self.manager.reenable_replication,
+ self.adm_ctxt,
+ vol['id'])
+
+ def test_reenable_replication_fail(self):
+ """Test promote replication when driver is not initialized."""
+ vol = test_utils.create_volume(self.ctxt,
+ status='available',
+ replication_status='error')
+ self.driver.reenable_replication.side_effect = \
+ exception.CinderException
+ self.assertRaises(exception.CinderException,
+ self.manager.reenable_replication,
+ self.adm_ctxt,
+ vol['id'])
'user_id': 'fake',
'launched_at': 'DONTCARE',
'size': 1,
+ 'replication_status': 'disabled',
+ 'replication_extended_status': None,
+ 'replication_driver_data': None,
}
self.assertDictMatch(msg['payload'], expected)
msg = fake_notifier.NOTIFICATIONS[1]
self.assertRaises(exception.CinderException,
self.volume.create_volume, ctxt, volume_src['id'])
+ @mock.patch(
+ 'cinder.volume.driver.VolumeDriver.create_replica_test_volume')
+ def test_create_volume_from_sourcereplica(self, _create_replica_test):
+ """Test volume can be created from a volume replica."""
+ _create_replica_test.return_value = None
+
+ volume_src = tests_utils.create_volume(self.context,
+ **self.volume_params)
+ self.volume.create_volume(self.context, volume_src['id'])
+ volume_dst = tests_utils.create_volume(self.context,
+ source_replicaid=
+ volume_src['id'],
+ **self.volume_params)
+ self.volume.create_volume(self.context, volume_dst['id'],
+ source_replicaid=volume_src['id'])
+ self.assertEqual('available',
+ db.volume_get(context.get_admin_context(),
+ volume_dst['id']).status)
+ self.assertTrue(_create_replica_test.called)
+ self.volume.delete_volume(self.context, volume_dst['id'])
+ self.volume.delete_volume(self.context, volume_src['id'])
+
def test_create_volume_from_sourcevol(self):
"""Test volume can be created from a source volume."""
def fake_create_cloned_volume(volume, src_vref):
self.assertEqual(volume['status'], 'available')
def _retype_volume_exec(self, driver, snap=False, policy='on-demand',
- migrate_exc=False, exc=None, diff_equal=False):
+ migrate_exc=False, exc=None, diff_equal=False,
+ replica=False):
elevated = context.get_admin_context()
project_id = self.context.project_id
vol_type = db.volume_type_get_by_name(elevated, 'new')
db.quota_create(elevated, project_id, 'volumes_new', 10)
+ if replica:
+ rep_status = 'active'
+ else:
+ rep_status = 'disabled'
volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host, status='retyping',
- volume_type_id=old_vol_type['id'])
+ volume_type_id=old_vol_type['id'],
+ replication_status=rep_status)
if snap:
self._create_snapshot(volume['id'], size=volume['size'])
if driver or diff_equal:
self._retype_volume_exec(False, policy='never',
exc=exception.VolumeMigrationFailed)
+ def test_retype_volume_migration_with_replica(self):
+ self._retype_volume_exec(False,
+ replica=True,
+ exc=exception.InvalidVolume)
+
def test_retype_volume_migration_with_snaps(self):
self._retype_volume_exec(False, snap=True, exc=exception.InvalidVolume)
snapshot_id='fake_snapshot_id',
image_id='fake_image_id',
source_volid='fake_src_id',
+ source_replicaid='fake_replica_id',
version='1.4')
def test_create_volume_serialization(self):
snapshot_id='fake_snapshot_id',
image_id='fake_image_id',
source_volid='fake_src_id',
+ source_replicaid='fake_replica_id',
version='1.4')
def test_delete_volume(self):
volume=self.fake_volume,
ref={'lv_name': 'foo'},
version='1.15')
+
+ def test_promote_replica(self):
+ self._test_volume_api('promote_replica',
+ rpc_method='cast',
+ volume=self.fake_volume,
+ version='1.17')
+
+ def test_reenable_replica(self):
+ self._test_volume_api('reenable_replication',
+ rpc_method='cast',
+ volume=self.fake_volume,
+ version='1.17')
size=1,
availability_zone='fake_az',
volume_type_id=None,
+ replication_status='disabled',
+ replication_extended_status=None,
+ replication_driver_data=None,
**kwargs):
"""Create a volume object in the DB."""
vol = {}
vol['volume_type_id'] = volume_type_id
for key in kwargs:
vol[key] = kwargs[key]
+ vol['replication_status'] = replication_status
+ vol['replication_extended_status'] = replication_extended_status
+ vol['replication_driver_data'] = replication_driver_data
+
return db.volume_create(ctxt, vol)
def create(self, context, size, name, description, snapshot=None,
image_id=None, volume_type=None, metadata=None,
availability_zone=None, source_volume=None,
- scheduler_hints=None, backup_source_volume=None):
+ scheduler_hints=None, backup_source_volume=None,
+ source_replica=None):
if source_volume and volume_type:
if volume_type['id'] != source_volume['volume_type_id']:
"You should omit the argument.")
raise exception.InvalidInput(reason=msg)
+ # When cloning replica (for testing), volume type must be omitted
+ if source_replica and volume_type:
+ msg = _("No volume_type should be provided when creating test "
+ "replica, type must be omitted.")
+ raise exception.InvalidInput(reason=msg)
+
if snapshot and volume_type:
if volume_type['id'] != snapshot['volume_type_id']:
msg = _("Invalid volume_type provided (requested type "
'scheduler_hints': scheduler_hints,
'key_manager': self.key_manager,
'backup_source_volume': backup_source_volume,
+ 'source_replica': source_replica,
'optional_args': {'is_quota_committed': False}
}
try:
msg = _("Snapshot cannot be created while volume is migrating")
raise exception.InvalidVolume(reason=msg)
+ if volume['status'].startswith('replica_'):
+ # Can't snapshot secondary replica
+ msg = _("Snapshot of secondary replica is not allowed.")
+ raise exception.InvalidVolume(reason=msg)
+
if ((not force) and (volume['status'] != "available")):
msg = _("must be available")
raise exception.InvalidVolume(reason=msg)
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
+ # We only handle non-replicated volumes for now
+ rep_status = volume['replication_status']
+ if rep_status is not None and rep_status != 'disabled':
+ msg = _("Volume must not be replicated.")
+ LOG.error(msg)
+ raise exception.InvalidVolume(reason=msg)
+
# Make sure the host is in the list of available hosts
elevated = context.elevated()
topic = CONF.volume_topic
def create_volume(self, volume):
"""Creates a volume. Can optionally return a Dictionary of
changes to the volume object to be persisted.
+
+ If volume_type extra specs includes
+ 'capabilities:replication <is> True' the driver
+ needs to create a volume replica (secondary), and setup replication
+ between the newly created volume and the secondary volume.
+ Returned dictionary should include:
+ volume['replication_status'] = 'copying'
+ volume['replication_extended_status'] = driver specific value
+ volume['driver_data'] = driver specific value
+
"""
raise NotImplementedError()
def create_volume_from_snapshot(self, volume, snapshot):
- """Creates a volume from a snapshot."""
+ """Creates a volume from a snapshot.
+
+ If volume_type extra specs includes 'replication: <is> True'
+ the driver needs to create a volume replica (secondary),
+ and setup replication between the newly created volume and
+ the secondary volume.
+ """
+
raise NotImplementedError()
def create_cloned_volume(self, volume, src_vref):
- """Creates a clone of the specified volume."""
+ """Creates a clone of the specified volume.
+
+ If volume_type extra specs includes 'replication: <is> True' the
+ driver needs to create a volume replica (secondary)
+ and setup replication between the newly created volume
+ and the secondary volume.
+
+ """
+
+ raise NotImplementedError()
+
+ def create_replica_test_volume(self, volume, src_vref):
+ """Creates a test replica clone of the specified replicated volume.
+
+ Create a clone of the replicated (secondary) volume.
+
+ """
raise NotImplementedError()
def delete_volume(self, volume):
- """Deletes a volume."""
+ """Deletes a volume.
+
+ If volume_type extra specs includes 'replication: <is> True'
+ then the driver needs to delete the volume replica too.
+
+ """
raise NotImplementedError()
def create_snapshot(self, snapshot):
def get_volume_stats(self, refresh=False):
"""Return the current state of the volume service. If 'refresh' is
True, run the update first.
+
+ For replication the following state should be reported:
+ replication_support = True (None or false disables replication)
+
"""
return None
def retype(self, context, volume, new_type, diff, host):
"""Convert the volume to be of the new type.
- Returns a boolean indicating whether the retype occurred.
+ Returns either:
+ A boolean indicating whether the retype occurred, or
+ A tuple (retyped, model_update) where retyped is a boolean
+ indicating if the retype occurred, and the model_update includes
+ changes for the volume db.
+ if diff['extra_specs'] includes 'replication' then:
+ if ('True', _ ) then replication should be disabled:
+ Volume replica should be deleted
+ volume['replication_status'] should be changed to 'disabled'
+ volume['replication_extended_status'] = None
+ volume['replication_driver_data'] = None
+ if (_, 'True') then replication should be enabled:
+ Volume replica (secondary) should be created, and replication
+ should be setup between the volume and the newly created
+ replica
+ volume['replication_status'] = 'copying'
+ volume['replication_extended_status'] = driver specific value
+ volume['replication_driver_data'] = driver specific value
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
host['host'] is its name, and host['capabilities'] is a
dictionary of its reported capabilities.
"""
- return False
+ return False, None
def accept_transfer(self, context, volume, new_user, new_project):
"""Accept the transfer of a volume for a new user/project."""
def validate_connector_has_setting(connector, setting):
pass
+ def reenable_replication(self, context, volume):
+ """Re-enable replication between the replica and primary volume.
+
+ This is used to re-enable/fix the replication between primary
+ and secondary. One use is as part of the fail-back process, when
+ you re-synchorize your old primary with the promoted volume
+ (the old replica).
+ Returns model_update for the volume to reflect the actions of the
+ driver.
+ The driver is expected to update the following entries:
+ 'replication_status'
+ 'replication_extended_status'
+ 'replication_driver_data'
+ Possible 'replication_status' values (in model_update) are:
+ 'error' - replication in error state
+ 'copying' - replication copying data to secondary (inconsistent)
+ 'active' - replication copying data to secondary (consistent)
+ 'active-stopped' - replication data copy on hold (consistent)
+ 'inactive' - replication data copy on hold (inconsistent)
+ Values in 'replication_extended_status' and 'replication_driver_data'
+ are managed by the driver.
+
+ :param context: Context
+ :param volume: A dictionary describing the volume
+
+ """
+ msg = _("sync_replica not implemented.")
+ raise NotImplementedError(msg)
+
+ def get_replication_status(self, context, volume):
+ """Query the actual volume replication status from the driver.
+
+ Returns model_update for the volume.
+ The driver is expected to update the following entries:
+ 'replication_status'
+ 'replication_extended_status'
+ 'replication_driver_data'
+ Possible 'replication_status' values (in model_update) are:
+ 'error' - replication in error state
+ 'copying' - replication copying data to secondary (inconsistent)
+ 'active' - replication copying data to secondary (consistent)
+ 'active-stopped' - replication data copy on hold (consistent)
+ 'inactive' - replication data copy on hold (inconsistent)
+ Values in 'replication_extended_status' and 'replication_driver_data'
+ are managed by the driver.
+
+ :param context: Context
+ :param volume: A dictionary describing the volume
+ """
+ return None
+
+ def promote_replica(self, context, volume):
+ """Promote the replica to be the primary volume.
+
+ Following this command, replication between the volumes at
+ the storage level should be stopped, the replica should be
+ available to be attached, and the replication status should
+ be in status 'inactive'.
+
+ Returns model_update for the volume.
+ The driver is expected to update the following entries:
+ 'replication_status'
+ 'replication_extended_status'
+ 'replication_driver_data'
+ Possible 'replication_status' values (in model_update) are:
+ 'error' - replication in error state
+ 'inactive' - replication data copy on hold (inconsistent)
+ Values in 'replication_extended_status' and 'replication_driver_data'
+ are managed by the driver.
+
+ :param context: Context
+ :param volume: A dictionary describing the volume
+ """
+ msg = _("promote_replica not implemented.")
+ raise NotImplementedError(msg)
+
# ####### Interface methods for DataPath (Connector) ########
def ensure_export(self, context, volume):
"""Synchronously recreates an export for a volume."""
# from, 'error' being the common example.
SNAPSHOT_PROCEED_STATUS = ('available',)
SRC_VOL_PROCEED_STATUS = ('available', 'in-use',)
+REPLICA_PROCEED_STATUS = ('active', 'active-stopped')
class ExtractVolumeRequestTask(flow_utils.CinderTask):
# reconstructed elsewhere and continued).
default_provides = set(['availability_zone', 'size', 'snapshot_id',
'source_volid', 'volume_type', 'volume_type_id',
- 'encryption_key_id'])
+ 'encryption_key_id', 'source_replicaid'])
def __init__(self, image_service, availability_zones, **kwargs):
super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
source_volid = source_volume['id']
return source_volid
+ @staticmethod
+ def _extract_source_replica(source_replica):
+ """Extracts the volume id from the provided replica (if provided).
+
+ This function validates the input replica_volume dict and checks that
+ the status of that replica_volume is valid for creating a volume from.
+ """
+
+ source_replicaid = None
+ if source_replica is not None:
+ if source_replica['status'] not in SRC_VOL_PROCEED_STATUS:
+ msg = _("Unable to create a volume from an originating source"
+ " volume when its status is not one of %s"
+ " values")
+ msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
+ # TODO(harlowja): what happens if the status changes after this
+ # initial volume status check occurs??? Seems like someone
+ # could delete the volume after this check passes but before
+ # the volume is officially created?
+ raise exception.InvalidVolume(reason=msg)
+ replication_status = source_replica['replication_status']
+ if replication_status not in REPLICA_PROCEED_STATUS:
+ msg = _("Unable to create a volume from a replica"
+ " when replication status is not one of %s"
+ " values")
+ msg = msg % (", ".join(REPLICA_PROCEED_STATUS))
+ # TODO(ronenkat): what happens if the replication status
+ # changes after this initial volume status check occurs???
+ raise exception.InvalidVolume(reason=msg)
+ source_replicaid = source_replica['id']
+ return source_replicaid
+
@staticmethod
def _extract_size(size, source_volume, snapshot):
"""Extracts and validates the volume size.
def execute(self, context, size, snapshot, image_id, source_volume,
availability_zone, volume_type, metadata,
- key_manager, backup_source_volume):
+ key_manager, backup_source_volume, source_replica):
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
# volume will remain available after we do this initial verification??
snapshot_id = self._extract_snapshot(snapshot)
source_volid = self._extract_source_volume(source_volume)
+ source_replicaid = self._extract_source_replica(source_replica)
size = self._extract_size(size, source_volume, snapshot)
self._check_image_metadata(context, image_id, size)
# should copy encryption metadata from the encrypted volume type to the
# volume upon creation and propagate that information to each snapshot.
# This strategy avoid any dependency upon the encrypted volume type.
+ def_vol_type = volume_types.get_default_volume_type()
if not volume_type and not source_volume and not snapshot:
- volume_type = volume_types.get_default_volume_type()
+ volume_type = def_vol_type
+
+ # When creating a clone of a replica (replication test), we can't
+ # use the volume type of the replica, therefore, we use the default.
+ # NOTE(ronenkat): this assumes the default type is not replicated.
+ if source_replicaid:
+ volume_type = def_vol_type
volume_type_id = self._get_volume_type_id(volume_type,
source_volume, snapshot,
'volume_type_id': volume_type_id,
'encryption_key_id': encryption_key_id,
'qos_specs': specs,
+ 'source_replicaid': source_replicaid,
}
def __init__(self, db):
requires = ['availability_zone', 'description', 'metadata',
'name', 'reservations', 'size', 'snapshot_id',
- 'source_volid', 'volume_type_id', 'encryption_key_id']
+ 'source_volid', 'volume_type_id', 'encryption_key_id',
+ 'source_replicaid']
super(EntryCreateTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
# Rename these to the internal name.
'display_description': kwargs.pop('description'),
'display_name': kwargs.pop('name'),
+ 'replication_status': 'disabled',
}
# Merge in the other required arguments which should provide the rest
def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
requires = ['image_id', 'scheduler_hints', 'snapshot_id',
'source_volid', 'volume_id', 'volume_type',
- 'volume_properties']
+ 'volume_properties', 'source_replicaid']
super(VolumeCastTask, self).__init__(addons=[ACTION],
requires=requires)
self.volume_rpcapi = volume_rpcapi
def _cast_create_volume(self, context, request_spec, filter_properties):
source_volid = request_spec['source_volid']
+ source_replicaid = request_spec['source_replicaid']
volume_id = request_spec['volume_id']
snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id']
elif source_volid:
source_volume_ref = self.db.volume_get(context, source_volid)
host = source_volume_ref['host']
+ elif source_replicaid:
+ source_volume_ref = self.db.volume_get(context, source_replicaid)
+ host = source_volume_ref['host']
if not host:
# Cast to the scheduler and let it handle whatever is needed
allow_reschedule=False,
snapshot_id=snapshot_id,
image_id=image_id,
- source_volid=source_volid)
+ source_volid=source_volid,
+ source_replicaid=source_replicaid)
def execute(self, context, **kwargs):
scheduler_hints = kwargs.pop('scheduler_hints', None)
default_provides = 'volume_spec'
def __init__(self, db):
- requires = ['image_id', 'snapshot_id', 'source_volid']
+ requires = ['image_id', 'snapshot_id', 'source_volid',
+ 'source_replicaid']
super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
'source_volstatus': source_volume_ref['status'],
'type': 'source_vol',
})
+ elif kwargs.get('source_replicaid'):
+ # We are making a clone based on the replica.
+ #
+ # NOTE(harlowja): This will likely fail if the replica
+ # disappeared by the time this call occurred.
+ source_volid = kwargs['source_replicaid']
+ source_volume_ref = self.db.volume_get(context, source_volid)
+ specs.update({
+ 'source_replicaid': source_volid,
+ 'source_replicastatus': source_volume_ref['status'],
+ 'type': 'source_replica',
+ })
elif kwargs.get('image_id'):
# We are making an image based volume instead of a raw volume.
image_href = kwargs['image_id']
context,
source_volid,
volume_id)
+ elif kwargs.get('source_replicaid'):
+ src_type = 'source replica'
+ src_id = kwargs['source_replicaid']
+ source_replicaid = src_id
+ LOG.debug(log_template % {'src_type': src_type,
+ 'src_id': src_id,
+ 'vol_id': volume_id})
+ self.db.volume_glance_metadata_copy_from_volume_to_volume(
+ context,
+ source_replicaid,
+ volume_id)
elif kwargs.get('image_id'):
src_type = 'image'
src_id = kwargs['image_id']
source_volid=source_volid)
return model_update
+ def _create_from_source_replica(self, context, volume_ref,
+ source_replicaid, **kwargs):
+ # NOTE(harlowja): if the source volume has disappeared this will be our
+ # detection of that since this database call should fail.
+ #
+ # NOTE(harlowja): likely this is not the best place for this to happen
+ # and we should have proper locks on the source volume while actions
+ # that use the source volume are underway.
+ srcvol_ref = self.db.volume_get(context, source_replicaid)
+ model_update = self.driver.create_replica_test_volume(volume_ref,
+ srcvol_ref)
+ # NOTE(harlowja): Subtasks would be useful here since after this
+ # point the volume has already been created and further failures
+ # will not destroy the volume (although they could in the future).
+ if srcvol_ref.bootable:
+ self._handle_bootable_volume_glance_meta(
+ context,
+ volume_ref['id'],
+ source_replicaid=source_replicaid)
+ return model_update
+
def _copy_image_to_volume(self, context, volume_ref,
image_id, image_location, image_service):
"""Downloads Glance image to the specified volume."""
elif create_type == 'source_vol':
model_update = self._create_from_source_volume(
context, volume_ref=volume_ref, **volume_spec)
+ elif create_type == 'source_replica':
+ model_update = self._create_from_source_replica(
+ context, volume_ref=volume_ref, **volume_spec)
elif create_type == 'image':
model_update = self._create_from_image(context,
volume_ref=volume_ref,
def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
allow_reschedule, reschedule_context, request_spec,
filter_properties, snapshot_id=None, image_id=None,
- source_volid=None):
+ source_volid=None, source_replicaid=None):
"""Constructs and returns the manager entrypoint flow.
This flow will do the following:
'snapshot_id': snapshot_id,
'source_volid': source_volid,
'volume_id': volume_id,
+ 'source_replicaid': source_replicaid,
}
volume_flow.add(ExtractVolumeRefTask(db, host))
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
- RPC_API_VERSION = '1.16'
+ RPC_API_VERSION = '1.17'
target = messaging.Target(version=RPC_API_VERSION)
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
- snapshot_id=None, image_id=None, source_volid=None):
+ snapshot_id=None, image_id=None, source_volid=None,
+ source_replicaid=None):
"""Creates the volume."""
context_saved = context.deepcopy()
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid,
+ source_replicaid=source_replicaid,
allow_reschedule=allow_reschedule,
reschedule_context=context_saved,
request_spec=request_spec,
except Exception:
LOG.exception(_("Failed to create manager volume flow"))
raise exception.CinderException(
- _("Failed to create manager volume flow"))
+ _("Failed to create manager volume flow."))
if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it.
elif source_volid is not None:
# Make sure the volume is not deleted until we are done with it.
locked_action = "%s-%s" % (source_volid, 'delete_volume')
+ elif source_replicaid is not None:
+ # Make sure the volume is not deleted until we are done with it.
+ locked_action = "%s-%s" % (source_replicaid, 'delete_volume')
else:
locked_action = None
retyped = True
# Call driver to try and change the type
+ retype_model_update = None
if not retyped:
try:
new_type = volume_types.get_volume_type(context, new_type_id)
- retyped = self.driver.retype(context, volume_ref, new_type,
- diff, host)
+ ret = self.driver.retype(context,
+ volume_ref,
+ new_type,
+ diff,
+ host)
+ # Check if the driver retype provided a model update or
+ # just a retype indication
+ if type(ret) == tuple:
+ retyped, retype_model_update = ret
+ else:
+ retyped = ret
+
if retyped:
LOG.info(_("Volume %s: retyped successfully"), volume_id)
except Exception as ex:
msg = _("Volume must not have snapshots.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
+
+ # Don't allow volume with replicas to be migrated
+ rep_status = volume_ref['replication_status']
+ if rep_status is not None and rep_status != 'disabled':
+ _retype_error(context, volume_id, old_reservations,
+ new_reservations, status_update)
+ msg = _("Volume must not be replicated.")
+ LOG.error(msg)
+ raise exception.InvalidVolume(reason=msg)
+
self.db.volume_update(context, volume_ref['id'],
{'migration_status': 'starting'})
_retype_error(context, volume_id, old_reservations,
new_reservations, status_update)
else:
- self.db.volume_update(context, volume_id,
- {'volume_type_id': new_type_id,
- 'host': host['host'],
- 'status': status_update['status']})
+ model_update = {'volume_type_id': new_type_id,
+ 'host': host['host'],
+ 'status': status_update['status']}
+ if retype_model_update:
+ model_update.update(retype_model_update)
+ self.db.volume_update(context, volume_id, model_update)
if old_reservations:
QUOTAS.commit(context, old_reservations, project_id=project_id)
self.publish_service_capabilities(context)
def manage_existing(self, ctxt, volume_id, ref=None):
- LOG.debug('manage_existing: managing %s' % ref)
+ LOG.debug('manage_existing: managing %s.' % ref)
try:
flow_engine = manage_existing.get_flow(
ctxt,
# Update volume stats
self.stats['allocated_capacity_gb'] += volume_ref['size']
return volume_ref['id']
+
+ def promote_replica(self, ctxt, volume_id):
+ """Promote volume replica secondary to be the primary volume."""
+ try:
+ utils.require_driver_initialized(self.driver)
+ except exception.DriverNotInitialized:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(_("Failed to promote replica for volume %(id)s.")
+ % {'id': volume_id})
+
+ volume = self.db.volume_get(ctxt, volume_id)
+ model_update = None
+ try:
+ LOG.debug("Volume %s: promote replica.", volume_id)
+ model_update = self.driver.promote_replica(ctxt, volume)
+ except exception.CinderException:
+ err_msg = (_('Error promoting secondary volume to primary'))
+ raise exception.ReplicationError(reason=err_msg,
+ volume_id=volume_id)
+
+ try:
+ if model_update:
+ volume = self.db.volume_update(ctxt,
+ volume_id,
+ model_update)
+ except exception.CinderException:
+ err_msg = (_("Failed updating model"
+ " with driver provided model %(model)s") %
+ {'model': model_update})
+ raise exception.ReplicationError(reason=err_msg,
+ volume_id=volume_id)
+
+ def reenable_replication(self, ctxt, volume_id):
+ """Re-enable replication of secondary volume with primary volumes."""
+ try:
+ utils.require_driver_initialized(self.driver)
+ except exception.DriverNotInitialized:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(_("Failed to sync replica for volume %(id)s.")
+ % {'id': volume_id})
+
+ volume = self.db.volume_get(ctxt, volume_id)
+ model_update = None
+ try:
+ LOG.debug("Volume %s: sync replica.", volume_id)
+ model_update = self.driver.reenable_replication(ctxt, volume)
+ except exception.CinderException:
+ err_msg = (_('Error synchronizing secondary volume to primary'))
+ raise exception.ReplicationError(reason=err_msg,
+ volume_id=volume_id)
+
+ try:
+ if model_update:
+ volume = self.db.volume_update(ctxt,
+ volume_id,
+ model_update)
+ except exception.CinderException:
+ err_msg = (_("Failed updating model"
+ " with driver provided model %(model)s") %
+ {'model': model_update})
+ raise exception.ReplicationError(reason=err_msg,
+ volume_id=volume_id)
+
+ @periodic_task.periodic_task
+ def _update_replication_relationship_status(self, ctxt):
+ LOG.info(_('Updating volume replication status.'))
+ if not self.driver.initialized:
+ if self.driver.configuration.config_group is None:
+ config_group = ''
+ else:
+ config_group = ('(config name %s)' %
+ self.driver.configuration.config_group)
+
+ LOG.warning(_('Unable to update volume replication status, '
+ '%(driver_name)s -%(driver_version)s '
+ '%(config_group)s driver is uninitialized.') %
+ {'driver_name': self.driver.__class__.__name__,
+ 'driver_version': self.driver.get_version(),
+ 'config_group': config_group})
+ else:
+ volumes = self.db.volume_get_all_by_host(ctxt, self.host)
+ for vol in volumes:
+ model_update = None
+ try:
+ model_update = self.driver.get_replication_status(
+ ctxt, vol)
+ if model_update:
+ self.db.volume_update(ctxt,
+ vol['id'],
+ model_update)
+ except Exception:
+ LOG.exception(_("Error checking replication status for "
+ "volume %s") % vol['id'])
1.14 - Adds reservation parameter to extend_volume().
1.15 - Adds manage_existing and unmanage_only flag to delete_volume.
1.16 - Removes create_export.
+ 1.17 - Add replica option to create_volume, promote_replica and
+ sync_replica.
'''
BASE_RPC_API_VERSION = '1.0'
super(VolumeAPI, self).__init__()
target = messaging.Target(topic=CONF.volume_topic,
version=self.BASE_RPC_API_VERSION)
- self.client = rpc.get_client(target, '1.15')
+ self.client = rpc.get_client(target, '1.17')
def create_volume(self, ctxt, volume, host,
request_spec, filter_properties,
allow_reschedule=True,
snapshot_id=None, image_id=None,
+ source_replicaid=None,
source_volid=None):
cctxt = self.client.prepare(server=host, version='1.4')
allow_reschedule=allow_reschedule,
snapshot_id=snapshot_id,
image_id=image_id,
+ source_replicaid=source_replicaid,
source_volid=source_volid),
def delete_volume(self, ctxt, volume, unmanage_only=False):
def manage_existing(self, ctxt, volume, ref):
cctxt = self.client.prepare(server=volume['host'], version='1.15')
cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
+
+ def promote_replica(self, ctxt, volume):
+ cctxt = self.client.prepare(server=volume['host'], version='1.17')
+ cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id'])
+
+ def reenable_replication(self, ctxt, volume):
+ cctxt = self.client.prepare(server=volume['host'], version='1.17')
+ cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id'])
created_at=null_safe_str(volume_ref['created_at']),
status=volume_ref['status'],
snapshot_id=volume_ref['snapshot_id'],
- size=volume_ref['size'])
+ size=volume_ref['size'],
+ replication_status=volume_ref['replication_status'],
+ replication_extended_status=
+ volume_ref['replication_extended_status'],
+ replication_driver_data=
+ volume_ref['replication_driver_data'],
+ )
usage_info.update(kw)
return usage_info
usage_info)
+def notify_about_replication_usage(context, volume, suffix,
+ extra_usage_info=None, host=None):
+ if not host:
+ host = CONF.host
+
+ if not extra_usage_info:
+ extra_usage_info = {}
+
+ usage_info = _usage_from_volume(context,
+ volume,
+ **extra_usage_info)
+
+ rpc.get_notifier('replication', host).info(context,
+ 'replication.%s' % suffix,
+ usage_info)
+
+
+def notify_about_replication_error(context, volume, suffix,
+ extra_error_info=None, host=None):
+ if not host:
+ host = CONF.host
+
+ if not extra_error_info:
+ extra_error_info = {}
+
+ usage_info = _usage_from_volume(context,
+ volume,
+ **extra_error_info)
+
+ rpc.get_notifier('replication', host).error(context,
+ 'replication.%s' % suffix,
+ usage_info)
+
+
def setup_blkio_cgroup(srcpath, dstpath, bps_limit, execute=utils.execute):
if not bps_limit:
LOG.debug('Not using bps rate limiting on volume copy')
# value)
#transfer_api_class=cinder.transfer.api.API
+# The full class name of the volume replication API class
+# (string value)
+#replication_api_class=cinder.replication.api.API
+
#
# Options defined in cinder.compute
"volume:delete_transfer": [],
"volume:get_all_transfers": [],
+ "volume_extension:replication:promote": ["rule:admin_api"],
+ "volume_extension:replication:reenable": ["rule:admin_api"],
+
"backup:create" : [],
"backup:delete": [],
"backup:get": [],