From 1c8f49bfe9fe3abd713e28922d5551f71228624c Mon Sep 17 00:00:00 2001 From: Ronen Kat Date: Sat, 26 Jul 2014 17:06:52 +0300 Subject: [PATCH] Add support in Cinder for volume replication - driver approach This is take #2 for managing replicaiton in Cinder. This patch provides the foundation in Cinder to make volume replication available to the cloud admin. It makes Cinder aware of volume replicas, and allows the cloud admin to define storage policies (volume types) that will enable replication. In this version Cinder delegates most the work on replication to the driver itself. This includes: 1. Driver exposes replication capabilities via volume type convention. 2. Extend volume table to include columns to support replicaion. 3. Create replicas in the driver, making it transparant to Cinder. 4. Volume manager code to handle API, updates to create_volume to support creating test replicas. 5. Driver methods to expose per replication functions Cinder-specs available at https://review.openstack.org/#/c/98308/ Volume replication use-case: Simplified disaster recovery The OpenStack cloud is deployed across two metro distance data centers. Storage backends are available in both data ceneters. The backends are managed by either a single Cinder host or two, depending on the storage backend requirements. Storage admin configures the Cinder volume driver to support replication. Cloud admin creates a volume type "replicated" with extra-specs: capabilities:replication=" True" Every volume created in type "replicated" has a copy on both backends. In case of data center failure in first data center, the cloud admin promotes the replica, and redeploy the VMs - they will now run on a host in the secondary data center using the storage on the secondary data center. Implements: blueprint volume-replication DocImpact Change-Id: I964852f08b500400a27bff99e5200386e00643c9 --- cinder/api/contrib/volume_replication.py | 139 ++++++++++ cinder/api/v2/views/volumes.py | 3 +- cinder/api/v2/volumes.py | 19 ++ cinder/common/config.py | 5 +- .../versions/024_add_replication_support.py | 51 ++++ cinder/db/sqlalchemy/models.py | 4 + cinder/exception.py | 10 + cinder/replication/__init__.py | 24 ++ cinder/replication/api.py | 114 ++++++++ .../tests/api/contrib/test_admin_actions.py | 14 + .../api/contrib/test_volume_replication.py | 246 ++++++++++++++++++ cinder/tests/api/v2/stubs.py | 5 +- cinder/tests/api/v2/test_volumes.py | 12 + cinder/tests/policy.json | 5 +- cinder/tests/test_create_volume_flow.py | 11 +- cinder/tests/test_migrations.py | 33 +++ cinder/tests/test_replication.py | 111 ++++++++ cinder/tests/test_volume.py | 40 ++- cinder/tests/test_volume_rpcapi.py | 14 + cinder/tests/utils.py | 7 + cinder/volume/api.py | 22 +- cinder/volume/driver.py | 145 ++++++++++- cinder/volume/flows/api/create_volume.py | 61 ++++- cinder/volume/flows/manager/create_volume.py | 53 +++- cinder/volume/manager.py | 141 +++++++++- cinder/volume/rpcapi.py | 14 +- cinder/volume/utils.py | 42 ++- etc/cinder/cinder.conf.sample | 4 + etc/cinder/policy.json | 3 + 29 files changed, 1317 insertions(+), 35 deletions(-) create mode 100644 cinder/api/contrib/volume_replication.py create mode 100644 cinder/db/sqlalchemy/migrate_repo/versions/024_add_replication_support.py create mode 100644 cinder/replication/__init__.py create mode 100644 cinder/replication/api.py create mode 100644 cinder/tests/api/contrib/test_volume_replication.py create mode 100644 cinder/tests/test_replication.py diff --git a/cinder/api/contrib/volume_replication.py b/cinder/api/contrib/volume_replication.py new file mode 100644 index 000000000..78756b691 --- /dev/null +++ b/cinder/api/contrib/volume_replication.py @@ -0,0 +1,139 @@ +# Copyright 2014 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import webob +from webob import exc + +from cinder.api import extensions +from cinder.api.openstack import wsgi +from cinder.api import xmlutil +from cinder import exception +from cinder.i18n import _ +from cinder.openstack.common import log as logging +from cinder import replication as replicationAPI +from cinder import volume + +LOG = logging.getLogger(__name__) + +authorize = extensions.soft_extension_authorizer('volume', + 'volume_replication') + + +class VolumeReplicationController(wsgi.Controller): + """The Volume Replication API controller for the Openstack API.""" + + def __init__(self, *args, **kwargs): + super(VolumeReplicationController, self).__init__(*args, **kwargs) + self.volume_api = volume.API() + self.replication_api = replicationAPI.API() + + def _add_replication_attributes(self, req, context, resp_volume): + db_volume = req.cached_resource_by_id(resp_volume['id']) + key = "%s:extended_status" % Volume_replication.alias + resp_volume[key] = db_volume['replication_extended_status'] + key = "%s:driver_data" % Volume_replication.alias + resp_volume[key] = db_volume['replication_driver_data'] + + @wsgi.extends + def show(self, req, resp_obj, id): + context = req.environ['cinder.context'] + if authorize(context): + resp_obj.attach(xml=VolumeReplicationAttributeTemplate()) + self._add_replication_attributes(req, context, + resp_obj.obj['volume']) + + @wsgi.extends + def detail(self, req, resp_obj): + context = req.environ['cinder.context'] + if authorize(context): + resp_obj.attach(xml=VolumeReplicationListAttributeTemplate()) + for vol in list(resp_obj.obj['volumes']): + self._add_replication_attributes(req, context, vol) + + @wsgi.response(202) + @wsgi.action('os-promote-replica') + def promote(self, req, id, body): + context = req.environ['cinder.context'] + try: + vol = self.volume_api.get(context, id) + LOG.info(_('Attempting to promote secondary replica to primary' + ' for volume %s.'), + str(id), + context=context) + self.replication_api.promote(context, vol) + except exception.NotFound: + msg = _("Volume could not be found") + raise exc.HTTPNotFound(explanation=msg) + except exception.ReplicationError as error: + raise exc.HTTPBadRequest(explanation=unicode(error)) + return webob.Response(status_int=202) + + @wsgi.response(202) + @wsgi.action('os-reenable-replica') + def reenable(self, req, id, body): + context = req.environ['cinder.context'] + try: + vol = self.volume_api.get(context, id) + LOG.info(_('Attempting to sync secondary replica with primary' + ' for volume %s.'), + str(id), + context=context) + self.replication_api.reenable(context, vol) + except exception.NotFound: + msg = _("Volume could not be found") + raise exc.HTTPNotFound(explanation=msg) + except exception.ReplicationError as error: + raise exc.HTTPBadRequest(explanation=unicode(error)) + return webob.Response(status_int=202) + + +class Volume_replication(extensions.ExtensionDescriptor): + """Volume replication management support.""" + + name = "VolumeReplication" + alias = "os-volume-replication" + namespace = "http://docs.openstack.org/volume/ext/volume_replication/" + \ + "api/v1" + updated = "2014-08-01T00:00:00+00:00" + + def get_controller_extensions(self): + controller = VolumeReplicationController() + extension = extensions.ControllerExtension(self, 'volumes', controller) + return [extension] + + +def make_volume(elem): + elem.set('{%s}extended_status' % Volume_replication.namespace, + '%s:extended_status' % Volume_replication.alias) + elem.set('{%s}driver_data' % Volume_replication.namespace, + '%s:driver_data' % Volume_replication.alias) + + +class VolumeReplicationAttributeTemplate(xmlutil.TemplateBuilder): + def construct(self): + root = xmlutil.TemplateElement('volume', selector='volume') + make_volume(root) + alias = Volume_replication.alias + namespace = Volume_replication.namespace + return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace}) + + +class VolumeReplicationListAttributeTemplate(xmlutil.TemplateBuilder): + def construct(self): + root = xmlutil.TemplateElement('volumes') + elem = xmlutil.SubTemplateElement(root, 'volume', selector='volumes') + make_volume(elem) + alias = Volume_replication.alias + namespace = Volume_replication.namespace + return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace}) diff --git a/cinder/api/v2/views/volumes.py b/cinder/api/v2/views/volumes.py index 9a034d327..d19eb04dd 100644 --- a/cinder/api/v2/views/volumes.py +++ b/cinder/api/v2/views/volumes.py @@ -68,7 +68,8 @@ class ViewBuilder(common.ViewBuilder): 'links': self._get_links(request, volume['id']), 'user_id': volume.get('user_id'), 'bootable': str(volume.get('bootable')).lower(), - 'encrypted': self._is_volume_encrypted(volume) + 'encrypted': self._is_volume_encrypted(volume), + 'replication_status': volume.get('replication_status') } } diff --git a/cinder/api/v2/volumes.py b/cinder/api/v2/volumes.py index 9c6f17da9..9cb980ba9 100644 --- a/cinder/api/v2/volumes.py +++ b/cinder/api/v2/volumes.py @@ -326,11 +326,30 @@ class VolumeController(wsgi.Controller): else: kwargs['source_volume'] = None + source_replica = volume.get('source_replica') + if source_replica is not None: + try: + src_vol = self.volume_api.get_volume(context, + source_replica) + if src_vol['replication_status'] == 'disabled': + explanation = _('source volume id:%s is not' + ' replicated') % source_volid + raise exc.HTTPNotFound(explanation=explanation) + kwargs['source_replica'] = src_vol + except exception.NotFound: + explanation = (_('replica source volume id:%s not found') % + source_replica) + raise exc.HTTPNotFound(explanation=explanation) + else: + kwargs['source_replica'] = None + size = volume.get('size', None) if size is None and kwargs['snapshot'] is not None: size = kwargs['snapshot']['volume_size'] elif size is None and kwargs['source_volume'] is not None: size = kwargs['source_volume']['size'] + elif size is None and kwargs['source_replica'] is not None: + size = kwargs['source_replica']['size'] LOG.info(_("Create volume of %s GB"), size, context=context) diff --git a/cinder/common/config.py b/cinder/common/config.py index 9e9db35f7..2affb30ea 100644 --- a/cinder/common/config.py +++ b/cinder/common/config.py @@ -194,6 +194,9 @@ global_opts = [ help='Whether snapshots count against GigaByte quota'), cfg.StrOpt('transfer_api_class', default='cinder.transfer.api.API', - help='The full class name of the volume transfer API class'), ] + help='The full class name of the volume transfer API class'), + cfg.StrOpt('replication_api_class', + default='cinder.replication.api.API', + help='The full class name of the volume replication API class'), ] CONF.register_opts(global_opts) diff --git a/cinder/db/sqlalchemy/migrate_repo/versions/024_add_replication_support.py b/cinder/db/sqlalchemy/migrate_repo/versions/024_add_replication_support.py new file mode 100644 index 000000000..3401f862d --- /dev/null +++ b/cinder/db/sqlalchemy/migrate_repo/versions/024_add_replication_support.py @@ -0,0 +1,51 @@ +# Copyright 2014 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Column +from sqlalchemy import MetaData, String, Table + +from cinder.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +def upgrade(migrate_engine): + """Add replication columns to volumes.""" + meta = MetaData() + meta.bind = migrate_engine + + volumes = Table('volumes', meta, autoload=True) + replication_status = Column('replication_status', String(255)) + replication_extended_status = Column('replication_extended_status', + String(255)) + replication_driver_data = Column('replication_driver_data', String(255)) + volumes.create_column(replication_status) + volumes.create_column(replication_extended_status) + volumes.create_column(replication_driver_data) + volumes.update().values(replication_status='disabled', + replication_extended_status=None, + replication_driver_data=None).execute() + + +def downgrade(migrate_engine): + meta = MetaData() + meta.bind = migrate_engine + + volumes = Table('volumes', meta, autoload=True) + replication_status = volumes.columns.replication_status + replication_extended_status = volumes.columns.replication_extended_status + replication_driver_data = volumes.columns.replication_driver_data + volumes.drop_column(replication_status) + volumes.drop_column(replication_extended_status) + volumes.drop_column(replication_driver_data) diff --git a/cinder/db/sqlalchemy/models.py b/cinder/db/sqlalchemy/models.py index e2860cc1d..cc16784d0 100644 --- a/cinder/db/sqlalchemy/models.py +++ b/cinder/db/sqlalchemy/models.py @@ -119,6 +119,10 @@ class Volume(BASE, CinderBase): deleted = Column(Boolean, default=False) bootable = Column(Boolean, default=False) + replication_status = Column(String(255)) + replication_extended_status = Column(String(255)) + replication_driver_data = Column(String(255)) + class VolumeMetadata(BASE, CinderBase): """Represents a metadata key/value pair for a volume.""" diff --git a/cinder/exception.py b/cinder/exception.py index bb8c0e3cb..4721a4df2 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -586,6 +586,16 @@ class ManageExistingInvalidReference(CinderException): "reference %(existing_ref)s: %(reason)s") +class ReplicationError(CinderException): + message = _("Volume %(volume_id)s replication " + "error: %(reason)s") + + +class ReplicationNotFound(NotFound): + message = _("Volume replication for %(volume_id)s " + "could not be found.") + + class ManageExistingVolumeTypeMismatch(CinderException): message = _("Manage existing volume failed due to volume type mismatch: " "%(reason)s") diff --git a/cinder/replication/__init__.py b/cinder/replication/__init__.py new file mode 100644 index 000000000..99f16a32a --- /dev/null +++ b/cinder/replication/__init__.py @@ -0,0 +1,24 @@ +# Copyright 2014 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo.config import cfg + +import cinder.openstack.common.importutils + + +CONF = cfg.CONF + +cls = CONF.replication_api_class +API = cinder.openstack.common.importutils.import_class(cls) diff --git a/cinder/replication/api.py b/cinder/replication/api.py new file mode 100644 index 000000000..97c19bf6a --- /dev/null +++ b/cinder/replication/api.py @@ -0,0 +1,114 @@ +# Copyright 2014 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Handles all requests relating to volume replication. +""" +import functools + +from oslo.config import cfg + +from cinder.db import base +from cinder import exception +from cinder.i18n import _ +from cinder.openstack.common import log as logging +from cinder import policy +from cinder import volume as cinder_volume +from cinder.volume import rpcapi as volume_rpcapi +from cinder.volume import utils as volume_utils + +CONF = cfg.CONF + +LOG = logging.getLogger(__name__) + +PROMOTE_PROCEED_STATUS = ('active', 'active-stopped') +REENABLE_PROCEED_STATUS = ('inactive', 'active-stopped', 'error') + + +def wrap_check_policy(func): + """Check policy corresponding to the wrapped methods prior to execution. + + This decorator requires the first 3 args of the wrapped function + to be (self, context, relationship_id) + """ + @functools.wraps(func) + def wrapped(self, context, target_obj, *args, **kwargs): + check_policy(context, func.__name__, target_obj) + return func(self, context, target_obj, *args, **kwargs) + return wrapped + + +def check_policy(context, action, target_obj=None): + target = { + 'project_id': context.project_id, + 'user_id': context.user_id, + } + target.update(target_obj or {}) + _action = 'volume_extension:replication:%s' % action + policy.enforce(context, _action, target) + + +class API(base.Base): + """API for interacting with volume replication relationships.""" + + def __init__(self, db_driver=None): + super(API, self).__init__(db_driver) + self.volume_rpcapi = volume_rpcapi.VolumeAPI() + self.volume_api = cinder_volume.API() + + @wrap_check_policy + def promote(self, context, vol): + if vol['replication_status'] == 'disabled': + msg = _("Replication is not enabled for volume") + raise exception.ReplicationError( + reason=msg, + volume_id=vol['id']) + if vol['replication_status'] not in PROMOTE_PROCEED_STATUS: + msg = _("Replication status for volume must be active or " + "active-stopped, but current status " + "is: %s") % vol['replication_status'] + raise exception.ReplicationError( + reason=msg, + volume_id=vol['id']) + + if vol['status'] != 'available': + msg = _("Volume status for volume must be available, but current " + "status is: %s") % vol['status'] + raise exception.ReplicationError( + reason=msg, + volume_id=vol['id']) + volume_utils.notify_about_replication_usage(context, + vol, + 'promote') + self.volume_rpcapi.promote_replica(context, vol) + + @wrap_check_policy + def reenable(self, context, vol): + if vol['replication_status'] == 'disabled': + msg = _("Replication is not enabled") + raise exception.ReplicationError( + reason=msg, + volume_id=vol['id']) + if vol['replication_status'] not in REENABLE_PROCEED_STATUS: + msg = _("Replication status for volume must be inactive," + " active-stopped, or error, but current status " + "is: %s") % vol['replication_status'] + raise exception.ReplicationError( + reason=msg, + volume_id=vol['id']) + + volume_utils.notify_about_replication_usage(context, + vol, + 'sync') + self.volume_rpcapi.reenable_replication(context, vol) diff --git a/cinder/tests/api/contrib/test_admin_actions.py b/cinder/tests/api/contrib/test_admin_actions.py index df8ae7c25..c8c3a259f 100644 --- a/cinder/tests/api/contrib/test_admin_actions.py +++ b/cinder/tests/api/contrib/test_admin_actions.py @@ -589,6 +589,20 @@ class AdminActionsTest(test.TestCase): volume = self._migrate_volume_exec(ctx, volume, host, expected_status) self.assertEqual(volume['migration_status'], 'starting') + def test_migrate_volume_fail_replication(self): + expected_status = 400 + host = 'test2' + ctx = context.RequestContext('admin', 'fake', True) + volume = self._migrate_volume_prep() + # current status is available + volume = db.volume_create(ctx, + {'status': 'available', + 'host': 'test', + 'provider_location': '', + 'attach_status': '', + 'replication_status': 'active'}) + volume = self._migrate_volume_exec(ctx, volume, host, expected_status) + def test_migrate_volume_as_non_admin(self): expected_status = 403 host = 'test2' diff --git a/cinder/tests/api/contrib/test_volume_replication.py b/cinder/tests/api/contrib/test_volume_replication.py new file mode 100644 index 000000000..2c2d3cdb4 --- /dev/null +++ b/cinder/tests/api/contrib/test_volume_replication.py @@ -0,0 +1,246 @@ +# Copyright 2014 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Tests for volume replication API code. +""" + +import json + +import mock +from oslo.config import cfg +import webob + +from cinder import context +from cinder import test +from cinder.tests.api import fakes +from cinder.tests import utils as tests_utils + +CONF = cfg.CONF + + +def app(): + # no auth, just let environ['cinder.context'] pass through + api = fakes.router.APIRouter() + mapper = fakes.urlmap.URLMap() + mapper['/v2'] = api + return mapper + + +class VolumeReplicationAPITestCase(test.TestCase): + """Test Cases for replication API.""" + + def setUp(self): + super(VolumeReplicationAPITestCase, self).setUp() + self.ctxt = context.RequestContext('admin', 'fake', True) + self.volume_params = { + 'host': CONF.host, + 'size': 1} + + def _get_resp(self, operation, volume_id, xml=False): + """Helper for a replication action req for the specified volume_id.""" + req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume_id) + req.method = 'POST' + if xml: + body = '' % operation + req.headers['Content-Type'] = 'application/xml' + req.headers['Accept'] = 'application/xml' + req.body = body + else: + body = {'os-%s-replica' % operation: ''} + req.headers['Content-Type'] = 'application/json' + req.body = json.dumps(body) + req.environ['cinder.context'] = context.RequestContext('admin', + 'fake', + True) + res = req.get_response(app()) + return req, res + + def test_promote_bad_id(self): + (req, res) = self._get_resp('promote', 'fake') + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 404, msg) + + def test_promote_bad_id_xml(self): + (req, res) = self._get_resp('promote', 'fake', xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 404, msg) + + def test_promote_volume_not_replicated(self): + volume = tests_utils.create_volume( + self.ctxt, + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id']) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + def test_promote_volume_not_replicated_xml(self): + volume = tests_utils.create_volume( + self.ctxt, + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id'], xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica') + def test_promote_replication_volume_status(self, + _rpcapi_promote): + for status in ['error', 'in-use']: + volume = tests_utils.create_volume(self.ctxt, + status = status, + replication_status = 'active', + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id']) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + for status in ['available']: + volume = tests_utils.create_volume(self.ctxt, + status = status, + replication_status = 'active', + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id']) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 202, msg) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica') + def test_promote_replication_volume_status_xml(self, + _rpcapi_promote): + for status in ['error', 'in-use']: + volume = tests_utils.create_volume(self.ctxt, + status = status, + replication_status = 'active', + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id'], xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + for status in ['available']: + volume = tests_utils.create_volume(self.ctxt, + status = status, + replication_status = 'active', + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id'], xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 202, msg) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica') + def test_promote_replication_replication_status(self, + _rpcapi_promote): + for status in ['error', 'copying', 'inactive']: + volume = tests_utils.create_volume(self.ctxt, + status = 'available', + replication_status = status, + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id']) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + for status in ['active', 'active-stopped']: + volume = tests_utils.create_volume(self.ctxt, + status = 'available', + replication_status = status, + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id']) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 202, msg) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica') + def test_promote_replication_replication_status_xml(self, + _rpcapi_promote): + for status in ['error', 'copying', 'inactive']: + volume = tests_utils.create_volume(self.ctxt, + status = 'available', + replication_status = status, + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id'], xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + for status in ['active', 'active-stopped']: + volume = tests_utils.create_volume(self.ctxt, + status = 'available', + replication_status = status, + **self.volume_params) + (req, res) = self._get_resp('promote', volume['id'], xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 202, msg) + + def test_reenable_bad_id(self): + (req, res) = self._get_resp('reenable', 'fake') + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 404, msg) + + def test_reenable_bad_id_xml(self): + (req, res) = self._get_resp('reenable', 'fake', xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 404, msg) + + def test_reenable_volume_not_replicated(self): + volume = tests_utils.create_volume( + self.ctxt, + **self.volume_params) + (req, res) = self._get_resp('reenable', volume['id']) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + def test_reenable_volume_not_replicated_xml(self): + volume = tests_utils.create_volume( + self.ctxt, + **self.volume_params) + (req, res) = self._get_resp('reenable', volume['id'], xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication') + def test_reenable_replication_replication_status(self, + _rpcapi_promote): + for status in ['active', 'copying']: + volume = tests_utils.create_volume(self.ctxt, + status = 'available', + replication_status = status, + **self.volume_params) + (req, res) = self._get_resp('reenable', volume['id']) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + for status in ['inactive', 'active-stopped', 'error']: + volume = tests_utils.create_volume(self.ctxt, + status = 'available', + replication_status = status, + **self.volume_params) + (req, res) = self._get_resp('reenable', volume['id']) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 202, msg) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication') + def test_reenable_replication_replication_status_xml(self, + _rpcapi_promote): + for status in ['active', 'copying']: + volume = tests_utils.create_volume(self.ctxt, + status = 'available', + replication_status = status, + **self.volume_params) + (req, res) = self._get_resp('reenable', volume['id'], xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 400, msg) + + for status in ['inactive', 'active-stopped', 'error']: + volume = tests_utils.create_volume(self.ctxt, + status = 'available', + replication_status = status, + **self.volume_params) + (req, res) = self._get_resp('reenable', volume['id'], xml=True) + msg = ("request: %s\nresult: %s" % (req, res)) + self.assertEqual(res.status_int, 202, msg) diff --git a/cinder/tests/api/v2/stubs.py b/cinder/tests/api/v2/stubs.py index 7b0d631ae..bb4a8419e 100644 --- a/cinder/tests/api/v2/stubs.py +++ b/cinder/tests/api/v2/stubs.py @@ -50,7 +50,10 @@ def stub_volume(id, **kwargs): {'key': 'readonly', 'value': 'False'}], 'bootable': False, 'launched_at': datetime.datetime(1, 1, 1, 1, 1, 1), - 'volume_type': {'name': 'vol_type_name'}} + 'volume_type': {'name': 'vol_type_name'}, + 'replication_status': 'disabled', + 'replication_extended_status': None, + 'replication_driver_data': None} volume.update(kwargs) if kwargs.get('volume_glance_metadata', None): diff --git a/cinder/tests/api/v2/test_volumes.py b/cinder/tests/api/v2/test_volumes.py index eb9736915..708a2e034 100644 --- a/cinder/tests/api/v2/test_volumes.py +++ b/cinder/tests/api/v2/test_volumes.py @@ -104,6 +104,7 @@ class VolumeApiTest(test.TestCase): 'rel': 'bookmark'}], 'metadata': {}, 'name': 'Volume Test Name', + 'replication_status': 'disabled', 'size': 100, 'snapshot_id': None, 'source_volid': None, @@ -207,6 +208,7 @@ class VolumeApiTest(test.TestCase): 'rel': 'bookmark'}], 'metadata': {}, 'name': 'Volume Test Name', + 'replication_status': 'disabled', 'size': '1', 'snapshot_id': None, 'source_volid': None, @@ -286,6 +288,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'Updated Test Name', + 'replication_status': 'disabled', 'attachments': [ { 'id': '1', @@ -338,6 +341,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'Updated Test Name', + 'replication_status': 'disabled', 'attachments': [ { 'id': '1', @@ -393,6 +397,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'New Name', + 'replication_status': 'disabled', 'attachments': [ { 'id': '1', @@ -443,6 +448,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'displayname', + 'replication_status': 'disabled', 'attachments': [{ 'id': '1', 'volume_id': '1', @@ -504,6 +510,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'Updated Test Name', + 'replication_status': 'disabled', 'attachments': [{ 'id': '1', 'volume_id': '1', @@ -607,6 +614,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'displayname', + 'replication_status': 'disabled', 'attachments': [ { 'device': '/', @@ -667,6 +675,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'displayname', + 'replication_status': 'disabled', 'attachments': [ { 'device': '/', @@ -1066,6 +1075,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'displayname', + 'replication_status': 'disabled', 'attachments': [ { 'device': '/', @@ -1115,6 +1125,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'displayname', + 'replication_status': 'disabled', 'attachments': [], 'user_id': 'fakeuser', 'volume_type': 'vol_type_name', @@ -1172,6 +1183,7 @@ class VolumeApiTest(test.TestCase): 'availability_zone': 'fakeaz', 'bootable': 'false', 'name': 'displayname', + 'replication_status': 'disabled', 'attachments': [ { 'device': '/', diff --git a/cinder/tests/policy.json b/cinder/tests/policy.json index 413be34f7..2c0b0ceaa 100644 --- a/cinder/tests/policy.json +++ b/cinder/tests/policy.json @@ -75,6 +75,9 @@ "backup:get_all": [], "backup:restore": [], "backup:backup-import": [["rule:admin_api"]], - "backup:backup-export": [["rule:admin_api"]] + "backup:backup-export": [["rule:admin_api"]], + + "volume_extension:replication:promote": [["rule:admin_api"]], + "volume_extension:replication:reenable": [["rule:admin_api"]] } diff --git a/cinder/tests/test_create_volume_flow.py b/cinder/tests/test_create_volume_flow.py index a4e4babd7..112fbe55e 100644 --- a/cinder/tests/test_create_volume_flow.py +++ b/cinder/tests/test_create_volume_flow.py @@ -42,12 +42,15 @@ class fake_volume_api(object): request_spec, filter_properties, allow_reschedule=True, snapshot_id=None, image_id=None, - source_volid=None): + source_volid=None, + source_replicaid=None): self.test_inst.assertEqual(self.expected_spec, request_spec) self.test_inst.assertEqual(request_spec['source_volid'], source_volid) self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id) self.test_inst.assertEqual(request_spec['image_id'], image_id) + self.test_inst.assertEqual(request_spec['source_replicaid'], + source_replicaid) class fake_db(object): @@ -84,7 +87,8 @@ class CreateVolumeFlowTestCase(test.TestCase): spec = {'volume_id': None, 'source_volid': None, 'snapshot_id': None, - 'image_id': None} + 'image_id': None, + 'source_replicaid': None} task = create_volume.VolumeCastTask( fake_scheduler_rpc_api(spec, self), @@ -96,7 +100,8 @@ class CreateVolumeFlowTestCase(test.TestCase): spec = {'volume_id': 1, 'source_volid': 2, 'snapshot_id': 3, - 'image_id': 4} + 'image_id': 4, + 'source_replicaid': 5} task = create_volume.VolumeCastTask( fake_scheduler_rpc_api(spec, self), diff --git a/cinder/tests/test_migrations.py b/cinder/tests/test_migrations.py index e3d3716ca..0068186a6 100644 --- a/cinder/tests/test_migrations.py +++ b/cinder/tests/test_migrations.py @@ -1093,3 +1093,36 @@ class TestMigrations(test.TestCase): autoload=True) index_names = [idx.name for idx in reservations.indexes] self.assertNotIn('reservations_deleted_expire_idx', index_names) + + def test_migration_024(self): + """Test adding replication columns to volume table.""" + for (key, engine) in self.engines.items(): + migration_api.version_control(engine, + TestMigrations.REPOSITORY, + migration.db_initial_version()) + migration_api.upgrade(engine, TestMigrations.REPOSITORY, 23) + metadata = sqlalchemy.schema.MetaData() + metadata.bind = engine + + migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24) + + volumes = sqlalchemy.Table('volumes', + metadata, + autoload=True) + self.assertIsInstance(volumes.c.replication_status.type, + sqlalchemy.types.VARCHAR) + self.assertIsInstance(volumes.c.replication_extended_status.type, + sqlalchemy.types.VARCHAR) + self.assertIsInstance(volumes.c.replication_driver_data.type, + sqlalchemy.types.VARCHAR) + + migration_api.downgrade(engine, TestMigrations.REPOSITORY, 23) + metadata = sqlalchemy.schema.MetaData() + metadata.bind = engine + + volumes = sqlalchemy.Table('volumes', + metadata, + autoload=True) + self.assertNotIn('replication_status', volumes.c) + self.assertNotIn('replication_extended_status', volumes.c) + self.assertNotIn('replication_driver_data', volumes.c) diff --git a/cinder/tests/test_replication.py b/cinder/tests/test_replication.py new file mode 100644 index 000000000..5ccec65fc --- /dev/null +++ b/cinder/tests/test_replication.py @@ -0,0 +1,111 @@ +# Copyright 2014 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Tests for Volume replication code. +""" + +import mock +from oslo.config import cfg + +from cinder import context +from cinder import db +from cinder import exception +from cinder.openstack.common import importutils +from cinder import test +from cinder.tests import utils as test_utils + + +CONF = cfg.CONF + + +class VolumeReplicationTestCase(test.TestCase): + def setUp(self): + super(VolumeReplicationTestCase, self).setUp() + self.ctxt = context.RequestContext('user', 'fake', False) + self.adm_ctxt = context.RequestContext('admin', 'fake', True) + self.manager = importutils.import_object(CONF.volume_manager) + self.manager.host = 'test_host' + self.manager.stats = {'allocated_capacity_gb': 0} + self.driver_patcher = mock.patch.object(self.manager, 'driver') + self.driver = self.driver_patcher.start() + + @mock.patch('cinder.utils.require_driver_initialized') + def test_promote_replica_uninit_driver(self, _init): + """Test promote replication when driver is not initialized.""" + _init.side_effect = exception.DriverNotInitialized + vol = test_utils.create_volume(self.ctxt, + status='available', + replication_status='active') + self.driver.promote_replica.return_value = None + self.assertRaises(exception.DriverNotInitialized, + self.manager.promote_replica, + self.adm_ctxt, + vol['id']) + + def test_promote_replica(self): + """Test promote replication.""" + vol = test_utils.create_volume(self.ctxt, + status='available', + replication_status='active') + self.driver.promote_replica.return_value = \ + {'replication_status': 'inactive'} + self.manager.promote_replica(self.adm_ctxt, vol['id']) + vol_after = db.volume_get(self.ctxt, vol['id']) + self.assertEqual(vol_after['replication_status'], 'inactive') + + def test_promote_replica_fail(self): + """Test promote replication when promote fails.""" + vol = test_utils.create_volume(self.ctxt, + status='available', + replication_status='active') + self.driver.promote_replica.side_effect = exception.CinderException + self.assertRaises(exception.CinderException, + self.manager.promote_replica, + self.adm_ctxt, + vol['id']) + + def test_reenable_replication(self): + """Test reenable replication.""" + vol = test_utils.create_volume(self.ctxt, + status='available', + replication_status='error') + self.driver.reenable_replication.return_value = \ + {'replication_status': 'copying'} + self.manager.reenable_replication(self.adm_ctxt, vol['id']) + vol_after = db.volume_get(self.ctxt, vol['id']) + self.assertEqual(vol_after['replication_status'], 'copying') + + @mock.patch('cinder.utils.require_driver_initialized') + def test_reenable_replication_uninit_driver(self, _init): + """Test reenable replication when driver is not initialized.""" + _init.side_effect = exception.DriverNotInitialized + vol = test_utils.create_volume(self.ctxt, + status='available', + replication_status='error') + self.assertRaises(exception.DriverNotInitialized, + self.manager.reenable_replication, + self.adm_ctxt, + vol['id']) + + def test_reenable_replication_fail(self): + """Test promote replication when driver is not initialized.""" + vol = test_utils.create_volume(self.ctxt, + status='available', + replication_status='error') + self.driver.reenable_replication.side_effect = \ + exception.CinderException + self.assertRaises(exception.CinderException, + self.manager.reenable_replication, + self.adm_ctxt, + vol['id']) diff --git a/cinder/tests/test_volume.py b/cinder/tests/test_volume.py index 5c937d1ed..4a68f6df8 100644 --- a/cinder/tests/test_volume.py +++ b/cinder/tests/test_volume.py @@ -357,6 +357,9 @@ class VolumeTestCase(BaseVolumeTestCase): 'user_id': 'fake', 'launched_at': 'DONTCARE', 'size': 1, + 'replication_status': 'disabled', + 'replication_extended_status': None, + 'replication_driver_data': None, } self.assertDictMatch(msg['payload'], expected) msg = fake_notifier.NOTIFICATIONS[1] @@ -2445,6 +2448,28 @@ class VolumeTestCase(BaseVolumeTestCase): self.assertRaises(exception.CinderException, self.volume.create_volume, ctxt, volume_src['id']) + @mock.patch( + 'cinder.volume.driver.VolumeDriver.create_replica_test_volume') + def test_create_volume_from_sourcereplica(self, _create_replica_test): + """Test volume can be created from a volume replica.""" + _create_replica_test.return_value = None + + volume_src = tests_utils.create_volume(self.context, + **self.volume_params) + self.volume.create_volume(self.context, volume_src['id']) + volume_dst = tests_utils.create_volume(self.context, + source_replicaid= + volume_src['id'], + **self.volume_params) + self.volume.create_volume(self.context, volume_dst['id'], + source_replicaid=volume_src['id']) + self.assertEqual('available', + db.volume_get(context.get_admin_context(), + volume_dst['id']).status) + self.assertTrue(_create_replica_test.called) + self.volume.delete_volume(self.context, volume_dst['id']) + self.volume.delete_volume(self.context, volume_src['id']) + def test_create_volume_from_sourcevol(self): """Test volume can be created from a source volume.""" def fake_create_cloned_volume(volume, src_vref): @@ -2706,7 +2731,8 @@ class VolumeTestCase(BaseVolumeTestCase): self.assertEqual(volume['status'], 'available') def _retype_volume_exec(self, driver, snap=False, policy='on-demand', - migrate_exc=False, exc=None, diff_equal=False): + migrate_exc=False, exc=None, diff_equal=False, + replica=False): elevated = context.get_admin_context() project_id = self.context.project_id @@ -2716,9 +2742,14 @@ class VolumeTestCase(BaseVolumeTestCase): vol_type = db.volume_type_get_by_name(elevated, 'new') db.quota_create(elevated, project_id, 'volumes_new', 10) + if replica: + rep_status = 'active' + else: + rep_status = 'disabled' volume = tests_utils.create_volume(self.context, size=1, host=CONF.host, status='retyping', - volume_type_id=old_vol_type['id']) + volume_type_id=old_vol_type['id'], + replication_status=rep_status) if snap: self._create_snapshot(volume['id'], size=volume['size']) if driver or diff_equal: @@ -2789,6 +2820,11 @@ class VolumeTestCase(BaseVolumeTestCase): self._retype_volume_exec(False, policy='never', exc=exception.VolumeMigrationFailed) + def test_retype_volume_migration_with_replica(self): + self._retype_volume_exec(False, + replica=True, + exc=exception.InvalidVolume) + def test_retype_volume_migration_with_snaps(self): self._retype_volume_exec(False, snap=True, exc=exception.InvalidVolume) diff --git a/cinder/tests/test_volume_rpcapi.py b/cinder/tests/test_volume_rpcapi.py index 51078b65a..ae1518998 100644 --- a/cinder/tests/test_volume_rpcapi.py +++ b/cinder/tests/test_volume_rpcapi.py @@ -146,6 +146,7 @@ class VolumeRpcAPITestCase(test.TestCase): snapshot_id='fake_snapshot_id', image_id='fake_image_id', source_volid='fake_src_id', + source_replicaid='fake_replica_id', version='1.4') def test_create_volume_serialization(self): @@ -160,6 +161,7 @@ class VolumeRpcAPITestCase(test.TestCase): snapshot_id='fake_snapshot_id', image_id='fake_image_id', source_volid='fake_src_id', + source_replicaid='fake_replica_id', version='1.4') def test_delete_volume(self): @@ -288,3 +290,15 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume, ref={'lv_name': 'foo'}, version='1.15') + + def test_promote_replica(self): + self._test_volume_api('promote_replica', + rpc_method='cast', + volume=self.fake_volume, + version='1.17') + + def test_reenable_replica(self): + self._test_volume_api('reenable_replication', + rpc_method='cast', + volume=self.fake_volume, + version='1.17') diff --git a/cinder/tests/utils.py b/cinder/tests/utils.py index 5262a7a54..a11f9d823 100644 --- a/cinder/tests/utils.py +++ b/cinder/tests/utils.py @@ -31,6 +31,9 @@ def create_volume(ctxt, size=1, availability_zone='fake_az', volume_type_id=None, + replication_status='disabled', + replication_extended_status=None, + replication_driver_data=None, **kwargs): """Create a volume object in the DB.""" vol = {} @@ -48,6 +51,10 @@ def create_volume(ctxt, vol['volume_type_id'] = volume_type_id for key in kwargs: vol[key] = kwargs[key] + vol['replication_status'] = replication_status + vol['replication_extended_status'] = replication_extended_status + vol['replication_driver_data'] = replication_driver_data + return db.volume_create(ctxt, vol) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 11ef072ee..06cc22217 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -152,7 +152,8 @@ class API(base.Base): def create(self, context, size, name, description, snapshot=None, image_id=None, volume_type=None, metadata=None, availability_zone=None, source_volume=None, - scheduler_hints=None, backup_source_volume=None): + scheduler_hints=None, backup_source_volume=None, + source_replica=None): if source_volume and volume_type: if volume_type['id'] != source_volume['volume_type_id']: @@ -161,6 +162,12 @@ class API(base.Base): "You should omit the argument.") raise exception.InvalidInput(reason=msg) + # When cloning replica (for testing), volume type must be omitted + if source_replica and volume_type: + msg = _("No volume_type should be provided when creating test " + "replica, type must be omitted.") + raise exception.InvalidInput(reason=msg) + if snapshot and volume_type: if volume_type['id'] != snapshot['volume_type_id']: msg = _("Invalid volume_type provided (requested type " @@ -190,6 +197,7 @@ class API(base.Base): 'scheduler_hints': scheduler_hints, 'key_manager': self.key_manager, 'backup_source_volume': backup_source_volume, + 'source_replica': source_replica, 'optional_args': {'is_quota_committed': False} } try: @@ -475,6 +483,11 @@ class API(base.Base): msg = _("Snapshot cannot be created while volume is migrating") raise exception.InvalidVolume(reason=msg) + if volume['status'].startswith('replica_'): + # Can't snapshot secondary replica + msg = _("Snapshot of secondary replica is not allowed.") + raise exception.InvalidVolume(reason=msg) + if ((not force) and (volume['status'] != "available")): msg = _("must be available") raise exception.InvalidVolume(reason=msg) @@ -839,6 +852,13 @@ class API(base.Base): LOG.error(msg) raise exception.InvalidVolume(reason=msg) + # We only handle non-replicated volumes for now + rep_status = volume['replication_status'] + if rep_status is not None and rep_status != 'disabled': + msg = _("Volume must not be replicated.") + LOG.error(msg) + raise exception.InvalidVolume(reason=msg) + # Make sure the host is in the list of available hosts elevated = context.elevated() topic = CONF.volume_topic diff --git a/cinder/volume/driver.py b/cinder/volume/driver.py index 0085eb350..64374d361 100644 --- a/cinder/volume/driver.py +++ b/cinder/volume/driver.py @@ -278,19 +278,57 @@ class VolumeDriver(object): def create_volume(self, volume): """Creates a volume. Can optionally return a Dictionary of changes to the volume object to be persisted. + + If volume_type extra specs includes + 'capabilities:replication True' the driver + needs to create a volume replica (secondary), and setup replication + between the newly created volume and the secondary volume. + Returned dictionary should include: + volume['replication_status'] = 'copying' + volume['replication_extended_status'] = driver specific value + volume['driver_data'] = driver specific value + """ raise NotImplementedError() def create_volume_from_snapshot(self, volume, snapshot): - """Creates a volume from a snapshot.""" + """Creates a volume from a snapshot. + + If volume_type extra specs includes 'replication: True' + the driver needs to create a volume replica (secondary), + and setup replication between the newly created volume and + the secondary volume. + """ + raise NotImplementedError() def create_cloned_volume(self, volume, src_vref): - """Creates a clone of the specified volume.""" + """Creates a clone of the specified volume. + + If volume_type extra specs includes 'replication: True' the + driver needs to create a volume replica (secondary) + and setup replication between the newly created volume + and the secondary volume. + + """ + + raise NotImplementedError() + + def create_replica_test_volume(self, volume, src_vref): + """Creates a test replica clone of the specified replicated volume. + + Create a clone of the replicated (secondary) volume. + + """ raise NotImplementedError() def delete_volume(self, volume): - """Deletes a volume.""" + """Deletes a volume. + + If volume_type extra specs includes 'replication: True' + then the driver needs to delete the volume replica too. + + """ raise NotImplementedError() def create_snapshot(self, snapshot): @@ -307,6 +345,10 @@ class VolumeDriver(object): def get_volume_stats(self, refresh=False): """Return the current state of the volume service. If 'refresh' is True, run the update first. + + For replication the following state should be reported: + replication_support = True (None or false disables replication) + """ return None @@ -547,7 +589,24 @@ class VolumeDriver(object): def retype(self, context, volume, new_type, diff, host): """Convert the volume to be of the new type. - Returns a boolean indicating whether the retype occurred. + Returns either: + A boolean indicating whether the retype occurred, or + A tuple (retyped, model_update) where retyped is a boolean + indicating if the retype occurred, and the model_update includes + changes for the volume db. + if diff['extra_specs'] includes 'replication' then: + if ('True', _ ) then replication should be disabled: + Volume replica should be deleted + volume['replication_status'] should be changed to 'disabled' + volume['replication_extended_status'] = None + volume['replication_driver_data'] = None + if (_, 'True') then replication should be enabled: + Volume replica (secondary) should be created, and replication + should be setup between the volume and the newly created + replica + volume['replication_status'] = 'copying' + volume['replication_extended_status'] = driver specific value + volume['replication_driver_data'] = driver specific value :param ctxt: Context :param volume: A dictionary describing the volume to migrate @@ -557,7 +616,7 @@ class VolumeDriver(object): host['host'] is its name, and host['capabilities'] is a dictionary of its reported capabilities. """ - return False + return False, None def accept_transfer(self, context, volume, new_user, new_project): """Accept the transfer of a volume for a new user/project.""" @@ -635,6 +694,82 @@ class VolumeDriver(object): def validate_connector_has_setting(connector, setting): pass + def reenable_replication(self, context, volume): + """Re-enable replication between the replica and primary volume. + + This is used to re-enable/fix the replication between primary + and secondary. One use is as part of the fail-back process, when + you re-synchorize your old primary with the promoted volume + (the old replica). + Returns model_update for the volume to reflect the actions of the + driver. + The driver is expected to update the following entries: + 'replication_status' + 'replication_extended_status' + 'replication_driver_data' + Possible 'replication_status' values (in model_update) are: + 'error' - replication in error state + 'copying' - replication copying data to secondary (inconsistent) + 'active' - replication copying data to secondary (consistent) + 'active-stopped' - replication data copy on hold (consistent) + 'inactive' - replication data copy on hold (inconsistent) + Values in 'replication_extended_status' and 'replication_driver_data' + are managed by the driver. + + :param context: Context + :param volume: A dictionary describing the volume + + """ + msg = _("sync_replica not implemented.") + raise NotImplementedError(msg) + + def get_replication_status(self, context, volume): + """Query the actual volume replication status from the driver. + + Returns model_update for the volume. + The driver is expected to update the following entries: + 'replication_status' + 'replication_extended_status' + 'replication_driver_data' + Possible 'replication_status' values (in model_update) are: + 'error' - replication in error state + 'copying' - replication copying data to secondary (inconsistent) + 'active' - replication copying data to secondary (consistent) + 'active-stopped' - replication data copy on hold (consistent) + 'inactive' - replication data copy on hold (inconsistent) + Values in 'replication_extended_status' and 'replication_driver_data' + are managed by the driver. + + :param context: Context + :param volume: A dictionary describing the volume + """ + return None + + def promote_replica(self, context, volume): + """Promote the replica to be the primary volume. + + Following this command, replication between the volumes at + the storage level should be stopped, the replica should be + available to be attached, and the replication status should + be in status 'inactive'. + + Returns model_update for the volume. + The driver is expected to update the following entries: + 'replication_status' + 'replication_extended_status' + 'replication_driver_data' + Possible 'replication_status' values (in model_update) are: + 'error' - replication in error state + 'inactive' - replication data copy on hold (inconsistent) + Values in 'replication_extended_status' and 'replication_driver_data' + are managed by the driver. + + :param context: Context + :param volume: A dictionary describing the volume + """ + msg = _("promote_replica not implemented.") + raise NotImplementedError(msg) + # ####### Interface methods for DataPath (Connector) ######## def ensure_export(self, context, volume): """Synchronously recreates an export for a volume.""" diff --git a/cinder/volume/flows/api/create_volume.py b/cinder/volume/flows/api/create_volume.py index f65aa1f70..639d60b22 100644 --- a/cinder/volume/flows/api/create_volume.py +++ b/cinder/volume/flows/api/create_volume.py @@ -40,6 +40,7 @@ QUOTAS = quota.QUOTAS # from, 'error' being the common example. SNAPSHOT_PROCEED_STATUS = ('available',) SRC_VOL_PROCEED_STATUS = ('available', 'in-use',) +REPLICA_PROCEED_STATUS = ('active', 'active-stopped') class ExtractVolumeRequestTask(flow_utils.CinderTask): @@ -58,7 +59,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): # reconstructed elsewhere and continued). default_provides = set(['availability_zone', 'size', 'snapshot_id', 'source_volid', 'volume_type', 'volume_type_id', - 'encryption_key_id']) + 'encryption_key_id', 'source_replicaid']) def __init__(self, image_service, availability_zones, **kwargs): super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION], @@ -111,6 +112,38 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): source_volid = source_volume['id'] return source_volid + @staticmethod + def _extract_source_replica(source_replica): + """Extracts the volume id from the provided replica (if provided). + + This function validates the input replica_volume dict and checks that + the status of that replica_volume is valid for creating a volume from. + """ + + source_replicaid = None + if source_replica is not None: + if source_replica['status'] not in SRC_VOL_PROCEED_STATUS: + msg = _("Unable to create a volume from an originating source" + " volume when its status is not one of %s" + " values") + msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS)) + # TODO(harlowja): what happens if the status changes after this + # initial volume status check occurs??? Seems like someone + # could delete the volume after this check passes but before + # the volume is officially created? + raise exception.InvalidVolume(reason=msg) + replication_status = source_replica['replication_status'] + if replication_status not in REPLICA_PROCEED_STATUS: + msg = _("Unable to create a volume from a replica" + " when replication status is not one of %s" + " values") + msg = msg % (", ".join(REPLICA_PROCEED_STATUS)) + # TODO(ronenkat): what happens if the replication status + # changes after this initial volume status check occurs??? + raise exception.InvalidVolume(reason=msg) + source_replicaid = source_replica['id'] + return source_replicaid + @staticmethod def _extract_size(size, source_volume, snapshot): """Extracts and validates the volume size. @@ -330,7 +363,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): def execute(self, context, size, snapshot, image_id, source_volume, availability_zone, volume_type, metadata, - key_manager, backup_source_volume): + key_manager, backup_source_volume, source_replica): utils.check_exclusive_options(snapshot=snapshot, imageRef=image_id, @@ -341,6 +374,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): # volume will remain available after we do this initial verification?? snapshot_id = self._extract_snapshot(snapshot) source_volid = self._extract_source_volume(source_volume) + source_replicaid = self._extract_source_replica(source_replica) size = self._extract_size(size, source_volume, snapshot) self._check_image_metadata(context, image_id, size) @@ -354,8 +388,15 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): # should copy encryption metadata from the encrypted volume type to the # volume upon creation and propagate that information to each snapshot. # This strategy avoid any dependency upon the encrypted volume type. + def_vol_type = volume_types.get_default_volume_type() if not volume_type and not source_volume and not snapshot: - volume_type = volume_types.get_default_volume_type() + volume_type = def_vol_type + + # When creating a clone of a replica (replication test), we can't + # use the volume type of the replica, therefore, we use the default. + # NOTE(ronenkat): this assumes the default type is not replicated. + if source_replicaid: + volume_type = def_vol_type volume_type_id = self._get_volume_type_id(volume_type, source_volume, snapshot, @@ -387,6 +428,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask): 'volume_type_id': volume_type_id, 'encryption_key_id': encryption_key_id, 'qos_specs': specs, + 'source_replicaid': source_replicaid, } @@ -401,7 +443,8 @@ class EntryCreateTask(flow_utils.CinderTask): def __init__(self, db): requires = ['availability_zone', 'description', 'metadata', 'name', 'reservations', 'size', 'snapshot_id', - 'source_volid', 'volume_type_id', 'encryption_key_id'] + 'source_volid', 'volume_type_id', 'encryption_key_id', + 'source_replicaid'] super(EntryCreateTask, self).__init__(addons=[ACTION], requires=requires) self.db = db @@ -427,6 +470,7 @@ class EntryCreateTask(flow_utils.CinderTask): # Rename these to the internal name. 'display_description': kwargs.pop('description'), 'display_name': kwargs.pop('name'), + 'replication_status': 'disabled', } # Merge in the other required arguments which should provide the rest @@ -612,7 +656,7 @@ class VolumeCastTask(flow_utils.CinderTask): def __init__(self, scheduler_rpcapi, volume_rpcapi, db): requires = ['image_id', 'scheduler_hints', 'snapshot_id', 'source_volid', 'volume_id', 'volume_type', - 'volume_properties'] + 'volume_properties', 'source_replicaid'] super(VolumeCastTask, self).__init__(addons=[ACTION], requires=requires) self.volume_rpcapi = volume_rpcapi @@ -621,6 +665,7 @@ class VolumeCastTask(flow_utils.CinderTask): def _cast_create_volume(self, context, request_spec, filter_properties): source_volid = request_spec['source_volid'] + source_replicaid = request_spec['source_replicaid'] volume_id = request_spec['volume_id'] snapshot_id = request_spec['snapshot_id'] image_id = request_spec['image_id'] @@ -639,6 +684,9 @@ class VolumeCastTask(flow_utils.CinderTask): elif source_volid: source_volume_ref = self.db.volume_get(context, source_volid) host = source_volume_ref['host'] + elif source_replicaid: + source_volume_ref = self.db.volume_get(context, source_replicaid) + host = source_volume_ref['host'] if not host: # Cast to the scheduler and let it handle whatever is needed @@ -666,7 +714,8 @@ class VolumeCastTask(flow_utils.CinderTask): allow_reschedule=False, snapshot_id=snapshot_id, image_id=image_id, - source_volid=source_volid) + source_volid=source_volid, + source_replicaid=source_replicaid) def execute(self, context, **kwargs): scheduler_hints = kwargs.pop('scheduler_hints', None) diff --git a/cinder/volume/flows/manager/create_volume.py b/cinder/volume/flows/manager/create_volume.py index a8cac6c44..8dc2f50fb 100644 --- a/cinder/volume/flows/manager/create_volume.py +++ b/cinder/volume/flows/manager/create_volume.py @@ -208,7 +208,8 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask): default_provides = 'volume_spec' def __init__(self, db): - requires = ['image_id', 'snapshot_id', 'source_volid'] + requires = ['image_id', 'snapshot_id', 'source_volid', + 'source_replicaid'] super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION], requires=requires) self.db = db @@ -254,6 +255,18 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask): 'source_volstatus': source_volume_ref['status'], 'type': 'source_vol', }) + elif kwargs.get('source_replicaid'): + # We are making a clone based on the replica. + # + # NOTE(harlowja): This will likely fail if the replica + # disappeared by the time this call occurred. + source_volid = kwargs['source_replicaid'] + source_volume_ref = self.db.volume_get(context, source_volid) + specs.update({ + 'source_replicaid': source_volid, + 'source_replicastatus': source_volume_ref['status'], + 'type': 'source_replica', + }) elif kwargs.get('image_id'): # We are making an image based volume instead of a raw volume. image_href = kwargs['image_id'] @@ -363,6 +376,17 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): context, source_volid, volume_id) + elif kwargs.get('source_replicaid'): + src_type = 'source replica' + src_id = kwargs['source_replicaid'] + source_replicaid = src_id + LOG.debug(log_template % {'src_type': src_type, + 'src_id': src_id, + 'vol_id': volume_id}) + self.db.volume_glance_metadata_copy_from_volume_to_volume( + context, + source_replicaid, + volume_id) elif kwargs.get('image_id'): src_type = 'image' src_id = kwargs['image_id'] @@ -432,6 +456,27 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): source_volid=source_volid) return model_update + def _create_from_source_replica(self, context, volume_ref, + source_replicaid, **kwargs): + # NOTE(harlowja): if the source volume has disappeared this will be our + # detection of that since this database call should fail. + # + # NOTE(harlowja): likely this is not the best place for this to happen + # and we should have proper locks on the source volume while actions + # that use the source volume are underway. + srcvol_ref = self.db.volume_get(context, source_replicaid) + model_update = self.driver.create_replica_test_volume(volume_ref, + srcvol_ref) + # NOTE(harlowja): Subtasks would be useful here since after this + # point the volume has already been created and further failures + # will not destroy the volume (although they could in the future). + if srcvol_ref.bootable: + self._handle_bootable_volume_glance_meta( + context, + volume_ref['id'], + source_replicaid=source_replicaid) + return model_update + def _copy_image_to_volume(self, context, volume_ref, image_id, image_location, image_service): """Downloads Glance image to the specified volume.""" @@ -588,6 +633,9 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask): elif create_type == 'source_vol': model_update = self._create_from_source_volume( context, volume_ref=volume_ref, **volume_spec) + elif create_type == 'source_replica': + model_update = self._create_from_source_replica( + context, volume_ref=volume_ref, **volume_spec) elif create_type == 'image': model_update = self._create_from_image(context, volume_ref=volume_ref, @@ -661,7 +709,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask): def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id, allow_reschedule, reschedule_context, request_spec, filter_properties, snapshot_id=None, image_id=None, - source_volid=None): + source_volid=None, source_replicaid=None): """Constructs and returns the manager entrypoint flow. This flow will do the following: @@ -691,6 +739,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id, 'snapshot_id': snapshot_id, 'source_volid': source_volid, 'volume_id': volume_id, + 'source_replicaid': source_replicaid, } volume_flow.add(ExtractVolumeRefTask(db, host)) diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 6251fc22c..d81f90acf 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -151,7 +151,7 @@ def locked_snapshot_operation(f): class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" - RPC_API_VERSION = '1.16' + RPC_API_VERSION = '1.17' target = messaging.Target(version=RPC_API_VERSION) @@ -273,7 +273,8 @@ class VolumeManager(manager.SchedulerDependentManager): def create_volume(self, context, volume_id, request_spec=None, filter_properties=None, allow_reschedule=True, - snapshot_id=None, image_id=None, source_volid=None): + snapshot_id=None, image_id=None, source_volid=None, + source_replicaid=None): """Creates the volume.""" context_saved = context.deepcopy() @@ -294,6 +295,7 @@ class VolumeManager(manager.SchedulerDependentManager): snapshot_id=snapshot_id, image_id=image_id, source_volid=source_volid, + source_replicaid=source_replicaid, allow_reschedule=allow_reschedule, reschedule_context=context_saved, request_spec=request_spec, @@ -301,7 +303,7 @@ class VolumeManager(manager.SchedulerDependentManager): except Exception: LOG.exception(_("Failed to create manager volume flow")) raise exception.CinderException( - _("Failed to create manager volume flow")) + _("Failed to create manager volume flow.")) if snapshot_id is not None: # Make sure the snapshot is not deleted until we are done with it. @@ -309,6 +311,9 @@ class VolumeManager(manager.SchedulerDependentManager): elif source_volid is not None: # Make sure the volume is not deleted until we are done with it. locked_action = "%s-%s" % (source_volid, 'delete_volume') + elif source_replicaid is not None: + # Make sure the volume is not deleted until we are done with it. + locked_action = "%s-%s" % (source_replicaid, 'delete_volume') else: locked_action = None @@ -1263,11 +1268,22 @@ class VolumeManager(manager.SchedulerDependentManager): retyped = True # Call driver to try and change the type + retype_model_update = None if not retyped: try: new_type = volume_types.get_volume_type(context, new_type_id) - retyped = self.driver.retype(context, volume_ref, new_type, - diff, host) + ret = self.driver.retype(context, + volume_ref, + new_type, + diff, + host) + # Check if the driver retype provided a model update or + # just a retype indication + if type(ret) == tuple: + retyped, retype_model_update = ret + else: + retyped = ret + if retyped: LOG.info(_("Volume %s: retyped successfully"), volume_id) except Exception as ex: @@ -1294,6 +1310,16 @@ class VolumeManager(manager.SchedulerDependentManager): msg = _("Volume must not have snapshots.") LOG.error(msg) raise exception.InvalidVolume(reason=msg) + + # Don't allow volume with replicas to be migrated + rep_status = volume_ref['replication_status'] + if rep_status is not None and rep_status != 'disabled': + _retype_error(context, volume_id, old_reservations, + new_reservations, status_update) + msg = _("Volume must not be replicated.") + LOG.error(msg) + raise exception.InvalidVolume(reason=msg) + self.db.volume_update(context, volume_ref['id'], {'migration_status': 'starting'}) @@ -1305,10 +1331,12 @@ class VolumeManager(manager.SchedulerDependentManager): _retype_error(context, volume_id, old_reservations, new_reservations, status_update) else: - self.db.volume_update(context, volume_id, - {'volume_type_id': new_type_id, - 'host': host['host'], - 'status': status_update['status']}) + model_update = {'volume_type_id': new_type_id, + 'host': host['host'], + 'status': status_update['status']} + if retype_model_update: + model_update.update(retype_model_update) + self.db.volume_update(context, volume_id, model_update) if old_reservations: QUOTAS.commit(context, old_reservations, project_id=project_id) @@ -1317,7 +1345,7 @@ class VolumeManager(manager.SchedulerDependentManager): self.publish_service_capabilities(context) def manage_existing(self, ctxt, volume_id, ref=None): - LOG.debug('manage_existing: managing %s' % ref) + LOG.debug('manage_existing: managing %s.' % ref) try: flow_engine = manage_existing.get_flow( ctxt, @@ -1339,3 +1367,96 @@ class VolumeManager(manager.SchedulerDependentManager): # Update volume stats self.stats['allocated_capacity_gb'] += volume_ref['size'] return volume_ref['id'] + + def promote_replica(self, ctxt, volume_id): + """Promote volume replica secondary to be the primary volume.""" + try: + utils.require_driver_initialized(self.driver) + except exception.DriverNotInitialized: + with excutils.save_and_reraise_exception(): + LOG.exception(_("Failed to promote replica for volume %(id)s.") + % {'id': volume_id}) + + volume = self.db.volume_get(ctxt, volume_id) + model_update = None + try: + LOG.debug("Volume %s: promote replica.", volume_id) + model_update = self.driver.promote_replica(ctxt, volume) + except exception.CinderException: + err_msg = (_('Error promoting secondary volume to primary')) + raise exception.ReplicationError(reason=err_msg, + volume_id=volume_id) + + try: + if model_update: + volume = self.db.volume_update(ctxt, + volume_id, + model_update) + except exception.CinderException: + err_msg = (_("Failed updating model" + " with driver provided model %(model)s") % + {'model': model_update}) + raise exception.ReplicationError(reason=err_msg, + volume_id=volume_id) + + def reenable_replication(self, ctxt, volume_id): + """Re-enable replication of secondary volume with primary volumes.""" + try: + utils.require_driver_initialized(self.driver) + except exception.DriverNotInitialized: + with excutils.save_and_reraise_exception(): + LOG.exception(_("Failed to sync replica for volume %(id)s.") + % {'id': volume_id}) + + volume = self.db.volume_get(ctxt, volume_id) + model_update = None + try: + LOG.debug("Volume %s: sync replica.", volume_id) + model_update = self.driver.reenable_replication(ctxt, volume) + except exception.CinderException: + err_msg = (_('Error synchronizing secondary volume to primary')) + raise exception.ReplicationError(reason=err_msg, + volume_id=volume_id) + + try: + if model_update: + volume = self.db.volume_update(ctxt, + volume_id, + model_update) + except exception.CinderException: + err_msg = (_("Failed updating model" + " with driver provided model %(model)s") % + {'model': model_update}) + raise exception.ReplicationError(reason=err_msg, + volume_id=volume_id) + + @periodic_task.periodic_task + def _update_replication_relationship_status(self, ctxt): + LOG.info(_('Updating volume replication status.')) + if not self.driver.initialized: + if self.driver.configuration.config_group is None: + config_group = '' + else: + config_group = ('(config name %s)' % + self.driver.configuration.config_group) + + LOG.warning(_('Unable to update volume replication status, ' + '%(driver_name)s -%(driver_version)s ' + '%(config_group)s driver is uninitialized.') % + {'driver_name': self.driver.__class__.__name__, + 'driver_version': self.driver.get_version(), + 'config_group': config_group}) + else: + volumes = self.db.volume_get_all_by_host(ctxt, self.host) + for vol in volumes: + model_update = None + try: + model_update = self.driver.get_replication_status( + ctxt, vol) + if model_update: + self.db.volume_update(ctxt, + vol['id'], + model_update) + except Exception: + LOG.exception(_("Error checking replication status for " + "volume %s") % vol['id']) diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 5d251c9fd..63c10ed98 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -51,6 +51,8 @@ class VolumeAPI(object): 1.14 - Adds reservation parameter to extend_volume(). 1.15 - Adds manage_existing and unmanage_only flag to delete_volume. 1.16 - Removes create_export. + 1.17 - Add replica option to create_volume, promote_replica and + sync_replica. ''' BASE_RPC_API_VERSION = '1.0' @@ -59,12 +61,13 @@ class VolumeAPI(object): super(VolumeAPI, self).__init__() target = messaging.Target(topic=CONF.volume_topic, version=self.BASE_RPC_API_VERSION) - self.client = rpc.get_client(target, '1.15') + self.client = rpc.get_client(target, '1.17') def create_volume(self, ctxt, volume, host, request_spec, filter_properties, allow_reschedule=True, snapshot_id=None, image_id=None, + source_replicaid=None, source_volid=None): cctxt = self.client.prepare(server=host, version='1.4') @@ -76,6 +79,7 @@ class VolumeAPI(object): allow_reschedule=allow_reschedule, snapshot_id=snapshot_id, image_id=image_id, + source_replicaid=source_replicaid, source_volid=source_volid), def delete_volume(self, ctxt, volume, unmanage_only=False): @@ -165,3 +169,11 @@ class VolumeAPI(object): def manage_existing(self, ctxt, volume, ref): cctxt = self.client.prepare(server=volume['host'], version='1.15') cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref) + + def promote_replica(self, ctxt, volume): + cctxt = self.client.prepare(server=volume['host'], version='1.17') + cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id']) + + def reenable_replication(self, ctxt, volume): + cctxt = self.client.prepare(server=volume['host'], version='1.17') + cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id']) diff --git a/cinder/volume/utils.py b/cinder/volume/utils.py index 7395965a0..37c17db2e 100644 --- a/cinder/volume/utils.py +++ b/cinder/volume/utils.py @@ -54,7 +54,13 @@ def _usage_from_volume(context, volume_ref, **kw): created_at=null_safe_str(volume_ref['created_at']), status=volume_ref['status'], snapshot_id=volume_ref['snapshot_id'], - size=volume_ref['size']) + size=volume_ref['size'], + replication_status=volume_ref['replication_status'], + replication_extended_status= + volume_ref['replication_extended_status'], + replication_driver_data= + volume_ref['replication_driver_data'], + ) usage_info.update(kw) return usage_info @@ -107,6 +113,40 @@ def notify_about_snapshot_usage(context, snapshot, event_suffix, usage_info) +def notify_about_replication_usage(context, volume, suffix, + extra_usage_info=None, host=None): + if not host: + host = CONF.host + + if not extra_usage_info: + extra_usage_info = {} + + usage_info = _usage_from_volume(context, + volume, + **extra_usage_info) + + rpc.get_notifier('replication', host).info(context, + 'replication.%s' % suffix, + usage_info) + + +def notify_about_replication_error(context, volume, suffix, + extra_error_info=None, host=None): + if not host: + host = CONF.host + + if not extra_error_info: + extra_error_info = {} + + usage_info = _usage_from_volume(context, + volume, + **extra_error_info) + + rpc.get_notifier('replication', host).error(context, + 'replication.%s' % suffix, + usage_info) + + def setup_blkio_cgroup(srcpath, dstpath, bps_limit, execute=utils.execute): if not bps_limit: LOG.debug('Not using bps rate limiting on volume copy') diff --git a/etc/cinder/cinder.conf.sample b/etc/cinder/cinder.conf.sample index 7cc9b4168..5c06c4eff 100644 --- a/etc/cinder/cinder.conf.sample +++ b/etc/cinder/cinder.conf.sample @@ -609,6 +609,10 @@ # value) #transfer_api_class=cinder.transfer.api.API +# The full class name of the volume replication API class +# (string value) +#replication_api_class=cinder.replication.api.API + # # Options defined in cinder.compute diff --git a/etc/cinder/policy.json b/etc/cinder/policy.json index 73b3bc423..3ec7fb484 100644 --- a/etc/cinder/policy.json +++ b/etc/cinder/policy.json @@ -52,6 +52,9 @@ "volume:delete_transfer": [], "volume:get_all_transfers": [], + "volume_extension:replication:promote": ["rule:admin_api"], + "volume_extension:replication:reenable": ["rule:admin_api"], + "backup:create" : [], "backup:delete": [], "backup:get": [], -- 2.45.2