]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add support in Cinder for volume replication - driver approach
authorRonen Kat <ronenkat@il.ibm.com>
Sat, 26 Jul 2014 14:06:52 +0000 (17:06 +0300)
committerRonen Kat <ronenkat@il.ibm.com>
Sat, 23 Aug 2014 15:53:55 +0000 (15:53 +0000)
This is take #2 for managing replicaiton in Cinder.

This patch provides the foundation in Cinder to make volume
replication available to the cloud admin. It makes Cinder aware
of volume replicas, and allows the cloud admin to define storage
policies (volume types) that will enable replication.

In this version Cinder delegates most the work on replication
to the driver itself.

This includes:
1. Driver exposes replication capabilities via volume type convention.
2. Extend volume table to include columns to support replicaion.
3. Create replicas in the driver, making it transparant to Cinder.
4. Volume manager code to handle API, updates to create_volume to
   support creating test replicas.
5. Driver methods to expose per replication functions

Cinder-specs available at https://review.openstack.org/#/c/98308/

Volume replication use-case: Simplified disaster recovery
The OpenStack cloud is deployed across two metro distance data centers.
Storage backends are available in both data ceneters. The backends
are managed by either a single Cinder host or two, depending on the
storage backend requirements.
Storage admin configures the Cinder volume driver to support
replication.
Cloud admin creates a volume type "replicated" with extra-specs:
   capabilities:replication="<is> True"
Every volume created in type "replicated" has a copy on both
backends.
In case of data center failure in first data center, the cloud admin
promotes the replica, and redeploy the VMs - they will now run on
a host in the secondary data center using the storage on the
secondary data center.

Implements: blueprint volume-replication
DocImpact

Change-Id: I964852f08b500400a27bff99e5200386e00643c9

29 files changed:
cinder/api/contrib/volume_replication.py [new file with mode: 0644]
cinder/api/v2/views/volumes.py
cinder/api/v2/volumes.py
cinder/common/config.py
cinder/db/sqlalchemy/migrate_repo/versions/024_add_replication_support.py [new file with mode: 0644]
cinder/db/sqlalchemy/models.py
cinder/exception.py
cinder/replication/__init__.py [new file with mode: 0644]
cinder/replication/api.py [new file with mode: 0644]
cinder/tests/api/contrib/test_admin_actions.py
cinder/tests/api/contrib/test_volume_replication.py [new file with mode: 0644]
cinder/tests/api/v2/stubs.py
cinder/tests/api/v2/test_volumes.py
cinder/tests/policy.json
cinder/tests/test_create_volume_flow.py
cinder/tests/test_migrations.py
cinder/tests/test_replication.py [new file with mode: 0644]
cinder/tests/test_volume.py
cinder/tests/test_volume_rpcapi.py
cinder/tests/utils.py
cinder/volume/api.py
cinder/volume/driver.py
cinder/volume/flows/api/create_volume.py
cinder/volume/flows/manager/create_volume.py
cinder/volume/manager.py
cinder/volume/rpcapi.py
cinder/volume/utils.py
etc/cinder/cinder.conf.sample
etc/cinder/policy.json

diff --git a/cinder/api/contrib/volume_replication.py b/cinder/api/contrib/volume_replication.py
new file mode 100644 (file)
index 0000000..78756b6
--- /dev/null
@@ -0,0 +1,139 @@
+# Copyright 2014 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import webob
+from webob import exc
+
+from cinder.api import extensions
+from cinder.api.openstack import wsgi
+from cinder.api import xmlutil
+from cinder import exception
+from cinder.i18n import _
+from cinder.openstack.common import log as logging
+from cinder import replication as replicationAPI
+from cinder import volume
+
+LOG = logging.getLogger(__name__)
+
+authorize = extensions.soft_extension_authorizer('volume',
+                                                 'volume_replication')
+
+
+class VolumeReplicationController(wsgi.Controller):
+    """The Volume Replication API controller for the Openstack API."""
+
+    def __init__(self, *args, **kwargs):
+        super(VolumeReplicationController, self).__init__(*args, **kwargs)
+        self.volume_api = volume.API()
+        self.replication_api = replicationAPI.API()
+
+    def _add_replication_attributes(self, req, context, resp_volume):
+        db_volume = req.cached_resource_by_id(resp_volume['id'])
+        key = "%s:extended_status" % Volume_replication.alias
+        resp_volume[key] = db_volume['replication_extended_status']
+        key = "%s:driver_data" % Volume_replication.alias
+        resp_volume[key] = db_volume['replication_driver_data']
+
+    @wsgi.extends
+    def show(self, req, resp_obj, id):
+        context = req.environ['cinder.context']
+        if authorize(context):
+            resp_obj.attach(xml=VolumeReplicationAttributeTemplate())
+            self._add_replication_attributes(req, context,
+                                             resp_obj.obj['volume'])
+
+    @wsgi.extends
+    def detail(self, req, resp_obj):
+        context = req.environ['cinder.context']
+        if authorize(context):
+            resp_obj.attach(xml=VolumeReplicationListAttributeTemplate())
+            for vol in list(resp_obj.obj['volumes']):
+                self._add_replication_attributes(req, context, vol)
+
+    @wsgi.response(202)
+    @wsgi.action('os-promote-replica')
+    def promote(self, req, id, body):
+        context = req.environ['cinder.context']
+        try:
+            vol = self.volume_api.get(context, id)
+            LOG.info(_('Attempting to promote secondary replica to primary'
+                       ' for volume %s.'),
+                     str(id),
+                     context=context)
+            self.replication_api.promote(context, vol)
+        except exception.NotFound:
+            msg = _("Volume could not be found")
+            raise exc.HTTPNotFound(explanation=msg)
+        except exception.ReplicationError as error:
+            raise exc.HTTPBadRequest(explanation=unicode(error))
+        return webob.Response(status_int=202)
+
+    @wsgi.response(202)
+    @wsgi.action('os-reenable-replica')
+    def reenable(self, req, id, body):
+        context = req.environ['cinder.context']
+        try:
+            vol = self.volume_api.get(context, id)
+            LOG.info(_('Attempting to sync secondary replica with primary'
+                       ' for volume %s.'),
+                     str(id),
+                     context=context)
+            self.replication_api.reenable(context, vol)
+        except exception.NotFound:
+            msg = _("Volume could not be found")
+            raise exc.HTTPNotFound(explanation=msg)
+        except exception.ReplicationError as error:
+            raise exc.HTTPBadRequest(explanation=unicode(error))
+        return webob.Response(status_int=202)
+
+
+class Volume_replication(extensions.ExtensionDescriptor):
+    """Volume replication management support."""
+
+    name = "VolumeReplication"
+    alias = "os-volume-replication"
+    namespace = "http://docs.openstack.org/volume/ext/volume_replication/" + \
+                "api/v1"
+    updated = "2014-08-01T00:00:00+00:00"
+
+    def get_controller_extensions(self):
+        controller = VolumeReplicationController()
+        extension = extensions.ControllerExtension(self, 'volumes', controller)
+        return [extension]
+
+
+def make_volume(elem):
+    elem.set('{%s}extended_status' % Volume_replication.namespace,
+             '%s:extended_status' % Volume_replication.alias)
+    elem.set('{%s}driver_data' % Volume_replication.namespace,
+             '%s:driver_data' % Volume_replication.alias)
+
+
+class VolumeReplicationAttributeTemplate(xmlutil.TemplateBuilder):
+    def construct(self):
+        root = xmlutil.TemplateElement('volume', selector='volume')
+        make_volume(root)
+        alias = Volume_replication.alias
+        namespace = Volume_replication.namespace
+        return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace})
+
+
+class VolumeReplicationListAttributeTemplate(xmlutil.TemplateBuilder):
+    def construct(self):
+        root = xmlutil.TemplateElement('volumes')
+        elem = xmlutil.SubTemplateElement(root, 'volume', selector='volumes')
+        make_volume(elem)
+        alias = Volume_replication.alias
+        namespace = Volume_replication.namespace
+        return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace})
index 9a034d327444e2975abcce2b0e2f971eb43b3eb7..d19eb04dda8163037911da568ce6728cae320b4b 100644 (file)
@@ -68,7 +68,8 @@ class ViewBuilder(common.ViewBuilder):
                 'links': self._get_links(request, volume['id']),
                 'user_id': volume.get('user_id'),
                 'bootable': str(volume.get('bootable')).lower(),
-                'encrypted': self._is_volume_encrypted(volume)
+                'encrypted': self._is_volume_encrypted(volume),
+                'replication_status': volume.get('replication_status')
             }
         }
 
index 9c6f17da9ced141cb6efb8990c7b6ee19e4fe820..9cb980ba911e167d77509a6bfb3289c0aa8c5cdd 100644 (file)
@@ -326,11 +326,30 @@ class VolumeController(wsgi.Controller):
         else:
             kwargs['source_volume'] = None
 
+        source_replica = volume.get('source_replica')
+        if source_replica is not None:
+            try:
+                src_vol = self.volume_api.get_volume(context,
+                                                     source_replica)
+                if src_vol['replication_status'] == 'disabled':
+                    explanation = _('source volume id:%s is not'
+                                    ' replicated') % source_volid
+                    raise exc.HTTPNotFound(explanation=explanation)
+                kwargs['source_replica'] = src_vol
+            except exception.NotFound:
+                explanation = (_('replica source volume id:%s not found') %
+                               source_replica)
+                raise exc.HTTPNotFound(explanation=explanation)
+        else:
+            kwargs['source_replica'] = None
+
         size = volume.get('size', None)
         if size is None and kwargs['snapshot'] is not None:
             size = kwargs['snapshot']['volume_size']
         elif size is None and kwargs['source_volume'] is not None:
             size = kwargs['source_volume']['size']
+        elif size is None and kwargs['source_replica'] is not None:
+            size = kwargs['source_replica']['size']
 
         LOG.info(_("Create volume of %s GB"), size, context=context)
 
index 9e9db35f75a32095eb81b1e4ad316cb9be93524f..2affb30eac63cbe2b30c2a5134ea4f84f3109b22 100644 (file)
@@ -194,6 +194,9 @@ global_opts = [
                 help='Whether snapshots count against GigaByte quota'),
     cfg.StrOpt('transfer_api_class',
                default='cinder.transfer.api.API',
-               help='The full class name of the volume transfer API class'), ]
+               help='The full class name of the volume transfer API class'),
+    cfg.StrOpt('replication_api_class',
+               default='cinder.replication.api.API',
+               help='The full class name of the volume replication API class'), ]
 
 CONF.register_opts(global_opts)
diff --git a/cinder/db/sqlalchemy/migrate_repo/versions/024_add_replication_support.py b/cinder/db/sqlalchemy/migrate_repo/versions/024_add_replication_support.py
new file mode 100644 (file)
index 0000000..3401f86
--- /dev/null
@@ -0,0 +1,51 @@
+# Copyright 2014 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from sqlalchemy import Column
+from sqlalchemy import MetaData, String, Table
+
+from cinder.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def upgrade(migrate_engine):
+    """Add replication columns to volumes."""
+    meta = MetaData()
+    meta.bind = migrate_engine
+
+    volumes = Table('volumes', meta, autoload=True)
+    replication_status = Column('replication_status', String(255))
+    replication_extended_status = Column('replication_extended_status',
+                                         String(255))
+    replication_driver_data = Column('replication_driver_data', String(255))
+    volumes.create_column(replication_status)
+    volumes.create_column(replication_extended_status)
+    volumes.create_column(replication_driver_data)
+    volumes.update().values(replication_status='disabled',
+                            replication_extended_status=None,
+                            replication_driver_data=None).execute()
+
+
+def downgrade(migrate_engine):
+    meta = MetaData()
+    meta.bind = migrate_engine
+
+    volumes = Table('volumes', meta, autoload=True)
+    replication_status = volumes.columns.replication_status
+    replication_extended_status = volumes.columns.replication_extended_status
+    replication_driver_data = volumes.columns.replication_driver_data
+    volumes.drop_column(replication_status)
+    volumes.drop_column(replication_extended_status)
+    volumes.drop_column(replication_driver_data)
index e2860cc1d0a11555b11e17502e632ddcb45ca7ca..cc16784d0324c6adfd08e63101c01cc2468b4dca 100644 (file)
@@ -119,6 +119,10 @@ class Volume(BASE, CinderBase):
     deleted = Column(Boolean, default=False)
     bootable = Column(Boolean, default=False)
 
+    replication_status = Column(String(255))
+    replication_extended_status = Column(String(255))
+    replication_driver_data = Column(String(255))
+
 
 class VolumeMetadata(BASE, CinderBase):
     """Represents a metadata key/value pair for a volume."""
index bb8c0e3cb2d30536aa9ff6b46b3aa64d315557ea..4721a4df286186a6164b0147cd27f4bd7bfedd5c 100644 (file)
@@ -586,6 +586,16 @@ class ManageExistingInvalidReference(CinderException):
                 "reference %(existing_ref)s: %(reason)s")
 
 
+class ReplicationError(CinderException):
+    message = _("Volume %(volume_id)s replication "
+                "error: %(reason)s")
+
+
+class ReplicationNotFound(NotFound):
+    message = _("Volume replication for %(volume_id)s "
+                "could not be found.")
+
+
 class ManageExistingVolumeTypeMismatch(CinderException):
     message = _("Manage existing volume failed due to volume type mismatch: "
                 "%(reason)s")
diff --git a/cinder/replication/__init__.py b/cinder/replication/__init__.py
new file mode 100644 (file)
index 0000000..99f16a3
--- /dev/null
@@ -0,0 +1,24 @@
+# Copyright 2014 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+from oslo.config import cfg
+
+import cinder.openstack.common.importutils
+
+
+CONF = cfg.CONF
+
+cls = CONF.replication_api_class
+API = cinder.openstack.common.importutils.import_class(cls)
diff --git a/cinder/replication/api.py b/cinder/replication/api.py
new file mode 100644 (file)
index 0000000..97c19bf
--- /dev/null
@@ -0,0 +1,114 @@
+# Copyright 2014 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Handles all requests relating to volume replication.
+"""
+import functools
+
+from oslo.config import cfg
+
+from cinder.db import base
+from cinder import exception
+from cinder.i18n import _
+from cinder.openstack.common import log as logging
+from cinder import policy
+from cinder import volume as cinder_volume
+from cinder.volume import rpcapi as volume_rpcapi
+from cinder.volume import utils as volume_utils
+
+CONF = cfg.CONF
+
+LOG = logging.getLogger(__name__)
+
+PROMOTE_PROCEED_STATUS = ('active', 'active-stopped')
+REENABLE_PROCEED_STATUS = ('inactive', 'active-stopped', 'error')
+
+
+def wrap_check_policy(func):
+    """Check policy corresponding to the wrapped methods prior to execution.
+
+    This decorator requires the first 3 args of the wrapped function
+    to be (self, context, relationship_id)
+    """
+    @functools.wraps(func)
+    def wrapped(self, context, target_obj, *args, **kwargs):
+        check_policy(context, func.__name__, target_obj)
+        return func(self, context, target_obj, *args, **kwargs)
+    return wrapped
+
+
+def check_policy(context, action, target_obj=None):
+    target = {
+        'project_id': context.project_id,
+        'user_id': context.user_id,
+    }
+    target.update(target_obj or {})
+    _action = 'volume_extension:replication:%s' % action
+    policy.enforce(context, _action, target)
+
+
+class API(base.Base):
+    """API for interacting with volume replication relationships."""
+
+    def __init__(self, db_driver=None):
+        super(API, self).__init__(db_driver)
+        self.volume_rpcapi = volume_rpcapi.VolumeAPI()
+        self.volume_api = cinder_volume.API()
+
+    @wrap_check_policy
+    def promote(self, context, vol):
+        if vol['replication_status'] == 'disabled':
+            msg = _("Replication is not enabled for volume")
+            raise exception.ReplicationError(
+                reason=msg,
+                volume_id=vol['id'])
+        if vol['replication_status'] not in PROMOTE_PROCEED_STATUS:
+            msg = _("Replication status for volume must be active or "
+                    "active-stopped, but current status "
+                    "is: %s") % vol['replication_status']
+            raise exception.ReplicationError(
+                reason=msg,
+                volume_id=vol['id'])
+
+        if vol['status'] != 'available':
+            msg = _("Volume status for volume must be available, but current "
+                    "status is: %s") % vol['status']
+            raise exception.ReplicationError(
+                reason=msg,
+                volume_id=vol['id'])
+        volume_utils.notify_about_replication_usage(context,
+                                                    vol,
+                                                    'promote')
+        self.volume_rpcapi.promote_replica(context, vol)
+
+    @wrap_check_policy
+    def reenable(self, context, vol):
+        if vol['replication_status'] == 'disabled':
+            msg = _("Replication is not enabled")
+            raise exception.ReplicationError(
+                reason=msg,
+                volume_id=vol['id'])
+        if vol['replication_status'] not in REENABLE_PROCEED_STATUS:
+            msg = _("Replication status for volume must be inactive,"
+                    " active-stopped, or error, but current status "
+                    "is: %s") % vol['replication_status']
+            raise exception.ReplicationError(
+                reason=msg,
+                volume_id=vol['id'])
+
+        volume_utils.notify_about_replication_usage(context,
+                                                    vol,
+                                                    'sync')
+        self.volume_rpcapi.reenable_replication(context, vol)
index df8ae7c25eec4ed359d228e64ba24d8a8f53938a..c8c3a259f84417393bda874a1dda40acd162fb34 100644 (file)
@@ -589,6 +589,20 @@ class AdminActionsTest(test.TestCase):
         volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
         self.assertEqual(volume['migration_status'], 'starting')
 
+    def test_migrate_volume_fail_replication(self):
+        expected_status = 400
+        host = 'test2'
+        ctx = context.RequestContext('admin', 'fake', True)
+        volume = self._migrate_volume_prep()
+        # current status is available
+        volume = db.volume_create(ctx,
+                                  {'status': 'available',
+                                   'host': 'test',
+                                   'provider_location': '',
+                                   'attach_status': '',
+                                   'replication_status': 'active'})
+        volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
+
     def test_migrate_volume_as_non_admin(self):
         expected_status = 403
         host = 'test2'
diff --git a/cinder/tests/api/contrib/test_volume_replication.py b/cinder/tests/api/contrib/test_volume_replication.py
new file mode 100644 (file)
index 0000000..2c2d3cd
--- /dev/null
@@ -0,0 +1,246 @@
+# Copyright 2014 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Tests for volume replication API code.
+"""
+
+import json
+
+import mock
+from oslo.config import cfg
+import webob
+
+from cinder import context
+from cinder import test
+from cinder.tests.api import fakes
+from cinder.tests import utils as tests_utils
+
+CONF = cfg.CONF
+
+
+def app():
+    # no auth, just let environ['cinder.context'] pass through
+    api = fakes.router.APIRouter()
+    mapper = fakes.urlmap.URLMap()
+    mapper['/v2'] = api
+    return mapper
+
+
+class VolumeReplicationAPITestCase(test.TestCase):
+    """Test Cases for replication API."""
+
+    def setUp(self):
+        super(VolumeReplicationAPITestCase, self).setUp()
+        self.ctxt = context.RequestContext('admin', 'fake', True)
+        self.volume_params = {
+            'host': CONF.host,
+            'size': 1}
+
+    def _get_resp(self, operation, volume_id, xml=False):
+        """Helper for a replication action req for the specified volume_id."""
+        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume_id)
+        req.method = 'POST'
+        if xml:
+            body = '<os-%s-replica/>' % operation
+            req.headers['Content-Type'] = 'application/xml'
+            req.headers['Accept'] = 'application/xml'
+            req.body = body
+        else:
+            body = {'os-%s-replica' % operation: ''}
+            req.headers['Content-Type'] = 'application/json'
+            req.body = json.dumps(body)
+        req.environ['cinder.context'] = context.RequestContext('admin',
+                                                               'fake',
+                                                               True)
+        res = req.get_response(app())
+        return req, res
+
+    def test_promote_bad_id(self):
+        (req, res) = self._get_resp('promote', 'fake')
+        msg = ("request: %s\nresult: %s" % (req, res))
+        self.assertEqual(res.status_int, 404, msg)
+
+    def test_promote_bad_id_xml(self):
+        (req, res) = self._get_resp('promote', 'fake', xml=True)
+        msg = ("request: %s\nresult: %s" % (req, res))
+        self.assertEqual(res.status_int, 404, msg)
+
+    def test_promote_volume_not_replicated(self):
+        volume = tests_utils.create_volume(
+            self.ctxt,
+            **self.volume_params)
+        (req, res) = self._get_resp('promote', volume['id'])
+        msg = ("request: %s\nresult: %s" % (req, res))
+        self.assertEqual(res.status_int, 400, msg)
+
+    def test_promote_volume_not_replicated_xml(self):
+        volume = tests_utils.create_volume(
+            self.ctxt,
+            **self.volume_params)
+        (req, res) = self._get_resp('promote', volume['id'], xml=True)
+        msg = ("request: %s\nresult: %s" % (req, res))
+        self.assertEqual(res.status_int, 400, msg)
+
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
+    def test_promote_replication_volume_status(self,
+                                               _rpcapi_promote):
+        for status in ['error', 'in-use']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = status,
+                                               replication_status = 'active',
+                                               **self.volume_params)
+            (req, res) = self._get_resp('promote', volume['id'])
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 400, msg)
+
+        for status in ['available']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = status,
+                                               replication_status = 'active',
+                                               **self.volume_params)
+            (req, res) = self._get_resp('promote', volume['id'])
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 202, msg)
+
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
+    def test_promote_replication_volume_status_xml(self,
+                                                   _rpcapi_promote):
+        for status in ['error', 'in-use']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = status,
+                                               replication_status = 'active',
+                                               **self.volume_params)
+            (req, res) = self._get_resp('promote', volume['id'], xml=True)
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 400, msg)
+
+        for status in ['available']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = status,
+                                               replication_status = 'active',
+                                               **self.volume_params)
+            (req, res) = self._get_resp('promote', volume['id'], xml=True)
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 202, msg)
+
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
+    def test_promote_replication_replication_status(self,
+                                                    _rpcapi_promote):
+        for status in ['error', 'copying', 'inactive']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = 'available',
+                                               replication_status = status,
+                                               **self.volume_params)
+            (req, res) = self._get_resp('promote', volume['id'])
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 400, msg)
+
+        for status in ['active', 'active-stopped']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = 'available',
+                                               replication_status = status,
+                                               **self.volume_params)
+            (req, res) = self._get_resp('promote', volume['id'])
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 202, msg)
+
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
+    def test_promote_replication_replication_status_xml(self,
+                                                        _rpcapi_promote):
+        for status in ['error', 'copying', 'inactive']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = 'available',
+                                               replication_status = status,
+                                               **self.volume_params)
+            (req, res) = self._get_resp('promote', volume['id'], xml=True)
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 400, msg)
+
+        for status in ['active', 'active-stopped']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = 'available',
+                                               replication_status = status,
+                                               **self.volume_params)
+            (req, res) = self._get_resp('promote', volume['id'], xml=True)
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 202, msg)
+
+    def test_reenable_bad_id(self):
+        (req, res) = self._get_resp('reenable', 'fake')
+        msg = ("request: %s\nresult: %s" % (req, res))
+        self.assertEqual(res.status_int, 404, msg)
+
+    def test_reenable_bad_id_xml(self):
+        (req, res) = self._get_resp('reenable', 'fake', xml=True)
+        msg = ("request: %s\nresult: %s" % (req, res))
+        self.assertEqual(res.status_int, 404, msg)
+
+    def test_reenable_volume_not_replicated(self):
+        volume = tests_utils.create_volume(
+            self.ctxt,
+            **self.volume_params)
+        (req, res) = self._get_resp('reenable', volume['id'])
+        msg = ("request: %s\nresult: %s" % (req, res))
+        self.assertEqual(res.status_int, 400, msg)
+
+    def test_reenable_volume_not_replicated_xml(self):
+        volume = tests_utils.create_volume(
+            self.ctxt,
+            **self.volume_params)
+        (req, res) = self._get_resp('reenable', volume['id'], xml=True)
+        msg = ("request: %s\nresult: %s" % (req, res))
+        self.assertEqual(res.status_int, 400, msg)
+
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
+    def test_reenable_replication_replication_status(self,
+                                                     _rpcapi_promote):
+        for status in ['active', 'copying']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = 'available',
+                                               replication_status = status,
+                                               **self.volume_params)
+            (req, res) = self._get_resp('reenable', volume['id'])
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 400, msg)
+
+        for status in ['inactive', 'active-stopped', 'error']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = 'available',
+                                               replication_status = status,
+                                               **self.volume_params)
+            (req, res) = self._get_resp('reenable', volume['id'])
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 202, msg)
+
+    @mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
+    def test_reenable_replication_replication_status_xml(self,
+                                                         _rpcapi_promote):
+        for status in ['active', 'copying']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = 'available',
+                                               replication_status = status,
+                                               **self.volume_params)
+            (req, res) = self._get_resp('reenable', volume['id'], xml=True)
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 400, msg)
+
+        for status in ['inactive', 'active-stopped', 'error']:
+            volume = tests_utils.create_volume(self.ctxt,
+                                               status = 'available',
+                                               replication_status = status,
+                                               **self.volume_params)
+            (req, res) = self._get_resp('reenable', volume['id'], xml=True)
+            msg = ("request: %s\nresult: %s" % (req, res))
+            self.assertEqual(res.status_int, 202, msg)
index 7b0d631ae0e2ac472e391998dcdb8aaad38fb396..bb4a8419e515c861e8f100f64f2940212012b0b6 100644 (file)
@@ -50,7 +50,10 @@ def stub_volume(id, **kwargs):
                                   {'key': 'readonly', 'value': 'False'}],
         'bootable': False,
         'launched_at': datetime.datetime(1, 1, 1, 1, 1, 1),
-        'volume_type': {'name': 'vol_type_name'}}
+        'volume_type': {'name': 'vol_type_name'},
+        'replication_status': 'disabled',
+        'replication_extended_status': None,
+        'replication_driver_data': None}
 
     volume.update(kwargs)
     if kwargs.get('volume_glance_metadata', None):
index eb973691575516bdec5aba50c9c8ee2186d6db38..708a2e034fb00f87f64f9451370ca7f39f586471 100644 (file)
@@ -104,6 +104,7 @@ class VolumeApiTest(test.TestCase):
                            'rel': 'bookmark'}],
                          'metadata': {},
                          'name': 'Volume Test Name',
+                         'replication_status': 'disabled',
                          'size': 100,
                          'snapshot_id': None,
                          'source_volid': None,
@@ -207,6 +208,7 @@ class VolumeApiTest(test.TestCase):
                            'rel': 'bookmark'}],
                          'metadata': {},
                          'name': 'Volume Test Name',
+                         'replication_status': 'disabled',
                          'size': '1',
                          'snapshot_id': None,
                          'source_volid': None,
@@ -286,6 +288,7 @@ class VolumeApiTest(test.TestCase):
                 'availability_zone': 'fakeaz',
                 'bootable': 'false',
                 'name': 'Updated Test Name',
+                'replication_status': 'disabled',
                 'attachments': [
                     {
                         'id': '1',
@@ -338,6 +341,7 @@ class VolumeApiTest(test.TestCase):
                 'availability_zone': 'fakeaz',
                 'bootable': 'false',
                 'name': 'Updated Test Name',
+                'replication_status': 'disabled',
                 'attachments': [
                     {
                         'id': '1',
@@ -393,6 +397,7 @@ class VolumeApiTest(test.TestCase):
                 'availability_zone': 'fakeaz',
                 'bootable': 'false',
                 'name': 'New Name',
+                'replication_status': 'disabled',
                 'attachments': [
                     {
                         'id': '1',
@@ -443,6 +448,7 @@ class VolumeApiTest(test.TestCase):
             'availability_zone': 'fakeaz',
             'bootable': 'false',
             'name': 'displayname',
+            'replication_status': 'disabled',
             'attachments': [{
                 'id': '1',
                 'volume_id': '1',
@@ -504,6 +510,7 @@ class VolumeApiTest(test.TestCase):
             'availability_zone': 'fakeaz',
             'bootable': 'false',
             'name': 'Updated Test Name',
+            'replication_status': 'disabled',
             'attachments': [{
                 'id': '1',
                 'volume_id': '1',
@@ -607,6 +614,7 @@ class VolumeApiTest(test.TestCase):
                     'availability_zone': 'fakeaz',
                     'bootable': 'false',
                     'name': 'displayname',
+                    'replication_status': 'disabled',
                     'attachments': [
                         {
                             'device': '/',
@@ -667,6 +675,7 @@ class VolumeApiTest(test.TestCase):
                     'availability_zone': 'fakeaz',
                     'bootable': 'false',
                     'name': 'displayname',
+                    'replication_status': 'disabled',
                     'attachments': [
                         {
                             'device': '/',
@@ -1066,6 +1075,7 @@ class VolumeApiTest(test.TestCase):
                 'availability_zone': 'fakeaz',
                 'bootable': 'false',
                 'name': 'displayname',
+                'replication_status': 'disabled',
                 'attachments': [
                     {
                         'device': '/',
@@ -1115,6 +1125,7 @@ class VolumeApiTest(test.TestCase):
                 'availability_zone': 'fakeaz',
                 'bootable': 'false',
                 'name': 'displayname',
+                'replication_status': 'disabled',
                 'attachments': [],
                 'user_id': 'fakeuser',
                 'volume_type': 'vol_type_name',
@@ -1172,6 +1183,7 @@ class VolumeApiTest(test.TestCase):
                 'availability_zone': 'fakeaz',
                 'bootable': 'false',
                 'name': 'displayname',
+                'replication_status': 'disabled',
                 'attachments': [
                     {
                         'device': '/',
index 413be34f728553fce66521ea8a673ecd4753cb1d..2c0b0ceaabf9c4f20a77709741439c788e5a291b 100644 (file)
@@ -75,6 +75,9 @@
     "backup:get_all": [],
     "backup:restore": [],
     "backup:backup-import": [["rule:admin_api"]],
-    "backup:backup-export": [["rule:admin_api"]]
+    "backup:backup-export": [["rule:admin_api"]],
+
+    "volume_extension:replication:promote": [["rule:admin_api"]],
+    "volume_extension:replication:reenable": [["rule:admin_api"]]
 
 }
index a4e4babd76e865fa91ed2bd59eef273bc82b3f74..112fbe55e4bb9219f431eea3c585f48536185420 100644 (file)
@@ -42,12 +42,15 @@ class fake_volume_api(object):
                       request_spec, filter_properties,
                       allow_reschedule=True,
                       snapshot_id=None, image_id=None,
-                      source_volid=None):
+                      source_volid=None,
+                      source_replicaid=None):
 
         self.test_inst.assertEqual(self.expected_spec, request_spec)
         self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
         self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
         self.test_inst.assertEqual(request_spec['image_id'], image_id)
+        self.test_inst.assertEqual(request_spec['source_replicaid'],
+                                   source_replicaid)
 
 
 class fake_db(object):
@@ -84,7 +87,8 @@ class CreateVolumeFlowTestCase(test.TestCase):
         spec = {'volume_id': None,
                 'source_volid': None,
                 'snapshot_id': None,
-                'image_id': None}
+                'image_id': None,
+                'source_replicaid': None}
 
         task = create_volume.VolumeCastTask(
             fake_scheduler_rpc_api(spec, self),
@@ -96,7 +100,8 @@ class CreateVolumeFlowTestCase(test.TestCase):
         spec = {'volume_id': 1,
                 'source_volid': 2,
                 'snapshot_id': 3,
-                'image_id': 4}
+                'image_id': 4,
+                'source_replicaid': 5}
 
         task = create_volume.VolumeCastTask(
             fake_scheduler_rpc_api(spec, self),
index e3d3716ca1d930fc5a07969241e9f290592c32ad..0068186a65deff7688192a7184fa04aa27d72e4d 100644 (file)
@@ -1093,3 +1093,36 @@ class TestMigrations(test.TestCase):
                                             autoload=True)
             index_names = [idx.name for idx in reservations.indexes]
             self.assertNotIn('reservations_deleted_expire_idx', index_names)
+
+    def test_migration_024(self):
+        """Test adding replication columns to volume table."""
+        for (key, engine) in self.engines.items():
+            migration_api.version_control(engine,
+                                          TestMigrations.REPOSITORY,
+                                          migration.db_initial_version())
+            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 23)
+            metadata = sqlalchemy.schema.MetaData()
+            metadata.bind = engine
+
+            migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24)
+
+            volumes = sqlalchemy.Table('volumes',
+                                       metadata,
+                                       autoload=True)
+            self.assertIsInstance(volumes.c.replication_status.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(volumes.c.replication_extended_status.type,
+                                  sqlalchemy.types.VARCHAR)
+            self.assertIsInstance(volumes.c.replication_driver_data.type,
+                                  sqlalchemy.types.VARCHAR)
+
+            migration_api.downgrade(engine, TestMigrations.REPOSITORY, 23)
+            metadata = sqlalchemy.schema.MetaData()
+            metadata.bind = engine
+
+            volumes = sqlalchemy.Table('volumes',
+                                       metadata,
+                                       autoload=True)
+            self.assertNotIn('replication_status', volumes.c)
+            self.assertNotIn('replication_extended_status', volumes.c)
+            self.assertNotIn('replication_driver_data', volumes.c)
diff --git a/cinder/tests/test_replication.py b/cinder/tests/test_replication.py
new file mode 100644 (file)
index 0000000..5ccec65
--- /dev/null
@@ -0,0 +1,111 @@
+# Copyright 2014 IBM Corp.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Tests for Volume replication code.
+"""
+
+import mock
+from oslo.config import cfg
+
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder.openstack.common import importutils
+from cinder import test
+from cinder.tests import utils as test_utils
+
+
+CONF = cfg.CONF
+
+
+class VolumeReplicationTestCase(test.TestCase):
+    def setUp(self):
+        super(VolumeReplicationTestCase, self).setUp()
+        self.ctxt = context.RequestContext('user', 'fake', False)
+        self.adm_ctxt = context.RequestContext('admin', 'fake', True)
+        self.manager = importutils.import_object(CONF.volume_manager)
+        self.manager.host = 'test_host'
+        self.manager.stats = {'allocated_capacity_gb': 0}
+        self.driver_patcher = mock.patch.object(self.manager, 'driver')
+        self.driver = self.driver_patcher.start()
+
+    @mock.patch('cinder.utils.require_driver_initialized')
+    def test_promote_replica_uninit_driver(self, _init):
+        """Test promote replication when driver is not initialized."""
+        _init.side_effect = exception.DriverNotInitialized
+        vol = test_utils.create_volume(self.ctxt,
+                                       status='available',
+                                       replication_status='active')
+        self.driver.promote_replica.return_value = None
+        self.assertRaises(exception.DriverNotInitialized,
+                          self.manager.promote_replica,
+                          self.adm_ctxt,
+                          vol['id'])
+
+    def test_promote_replica(self):
+        """Test promote replication."""
+        vol = test_utils.create_volume(self.ctxt,
+                                       status='available',
+                                       replication_status='active')
+        self.driver.promote_replica.return_value = \
+            {'replication_status': 'inactive'}
+        self.manager.promote_replica(self.adm_ctxt, vol['id'])
+        vol_after = db.volume_get(self.ctxt, vol['id'])
+        self.assertEqual(vol_after['replication_status'], 'inactive')
+
+    def test_promote_replica_fail(self):
+        """Test promote replication when promote fails."""
+        vol = test_utils.create_volume(self.ctxt,
+                                       status='available',
+                                       replication_status='active')
+        self.driver.promote_replica.side_effect = exception.CinderException
+        self.assertRaises(exception.CinderException,
+                          self.manager.promote_replica,
+                          self.adm_ctxt,
+                          vol['id'])
+
+    def test_reenable_replication(self):
+        """Test reenable replication."""
+        vol = test_utils.create_volume(self.ctxt,
+                                       status='available',
+                                       replication_status='error')
+        self.driver.reenable_replication.return_value = \
+            {'replication_status': 'copying'}
+        self.manager.reenable_replication(self.adm_ctxt, vol['id'])
+        vol_after = db.volume_get(self.ctxt, vol['id'])
+        self.assertEqual(vol_after['replication_status'], 'copying')
+
+    @mock.patch('cinder.utils.require_driver_initialized')
+    def test_reenable_replication_uninit_driver(self, _init):
+        """Test reenable replication when driver is not initialized."""
+        _init.side_effect = exception.DriverNotInitialized
+        vol = test_utils.create_volume(self.ctxt,
+                                       status='available',
+                                       replication_status='error')
+        self.assertRaises(exception.DriverNotInitialized,
+                          self.manager.reenable_replication,
+                          self.adm_ctxt,
+                          vol['id'])
+
+    def test_reenable_replication_fail(self):
+        """Test promote replication when driver is not initialized."""
+        vol = test_utils.create_volume(self.ctxt,
+                                       status='available',
+                                       replication_status='error')
+        self.driver.reenable_replication.side_effect = \
+            exception.CinderException
+        self.assertRaises(exception.CinderException,
+                          self.manager.reenable_replication,
+                          self.adm_ctxt,
+                          vol['id'])
index 5c937d1ed2e8cf06297592de1d337627bf652cfc..4a68f6df86c73f966e607e533b07d1acf3f321e6 100644 (file)
@@ -357,6 +357,9 @@ class VolumeTestCase(BaseVolumeTestCase):
             'user_id': 'fake',
             'launched_at': 'DONTCARE',
             'size': 1,
+            'replication_status': 'disabled',
+            'replication_extended_status': None,
+            'replication_driver_data': None,
         }
         self.assertDictMatch(msg['payload'], expected)
         msg = fake_notifier.NOTIFICATIONS[1]
@@ -2445,6 +2448,28 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.assertRaises(exception.CinderException,
                           self.volume.create_volume, ctxt, volume_src['id'])
 
+    @mock.patch(
+        'cinder.volume.driver.VolumeDriver.create_replica_test_volume')
+    def test_create_volume_from_sourcereplica(self, _create_replica_test):
+        """Test volume can be created from a volume replica."""
+        _create_replica_test.return_value = None
+
+        volume_src = tests_utils.create_volume(self.context,
+                                               **self.volume_params)
+        self.volume.create_volume(self.context, volume_src['id'])
+        volume_dst = tests_utils.create_volume(self.context,
+                                               source_replicaid=
+                                               volume_src['id'],
+                                               **self.volume_params)
+        self.volume.create_volume(self.context, volume_dst['id'],
+                                  source_replicaid=volume_src['id'])
+        self.assertEqual('available',
+                         db.volume_get(context.get_admin_context(),
+                                       volume_dst['id']).status)
+        self.assertTrue(_create_replica_test.called)
+        self.volume.delete_volume(self.context, volume_dst['id'])
+        self.volume.delete_volume(self.context, volume_src['id'])
+
     def test_create_volume_from_sourcevol(self):
         """Test volume can be created from a source volume."""
         def fake_create_cloned_volume(volume, src_vref):
@@ -2706,7 +2731,8 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.assertEqual(volume['status'], 'available')
 
     def _retype_volume_exec(self, driver, snap=False, policy='on-demand',
-                            migrate_exc=False, exc=None, diff_equal=False):
+                            migrate_exc=False, exc=None, diff_equal=False,
+                            replica=False):
         elevated = context.get_admin_context()
         project_id = self.context.project_id
 
@@ -2716,9 +2742,14 @@ class VolumeTestCase(BaseVolumeTestCase):
         vol_type = db.volume_type_get_by_name(elevated, 'new')
         db.quota_create(elevated, project_id, 'volumes_new', 10)
 
+        if replica:
+            rep_status = 'active'
+        else:
+            rep_status = 'disabled'
         volume = tests_utils.create_volume(self.context, size=1,
                                            host=CONF.host, status='retyping',
-                                           volume_type_id=old_vol_type['id'])
+                                           volume_type_id=old_vol_type['id'],
+                                           replication_status=rep_status)
         if snap:
             self._create_snapshot(volume['id'], size=volume['size'])
         if driver or diff_equal:
@@ -2789,6 +2820,11 @@ class VolumeTestCase(BaseVolumeTestCase):
         self._retype_volume_exec(False, policy='never',
                                  exc=exception.VolumeMigrationFailed)
 
+    def test_retype_volume_migration_with_replica(self):
+        self._retype_volume_exec(False,
+                                 replica=True,
+                                 exc=exception.InvalidVolume)
+
     def test_retype_volume_migration_with_snaps(self):
         self._retype_volume_exec(False, snap=True, exc=exception.InvalidVolume)
 
index 51078b65aa82598c9537879cac4abaa83f6a906d..ae15189983c1ae77d0cbb4f54ac925922be79d4e 100644 (file)
@@ -146,6 +146,7 @@ class VolumeRpcAPITestCase(test.TestCase):
                               snapshot_id='fake_snapshot_id',
                               image_id='fake_image_id',
                               source_volid='fake_src_id',
+                              source_replicaid='fake_replica_id',
                               version='1.4')
 
     def test_create_volume_serialization(self):
@@ -160,6 +161,7 @@ class VolumeRpcAPITestCase(test.TestCase):
                               snapshot_id='fake_snapshot_id',
                               image_id='fake_image_id',
                               source_volid='fake_src_id',
+                              source_replicaid='fake_replica_id',
                               version='1.4')
 
     def test_delete_volume(self):
@@ -288,3 +290,15 @@ class VolumeRpcAPITestCase(test.TestCase):
                               volume=self.fake_volume,
                               ref={'lv_name': 'foo'},
                               version='1.15')
+
+    def test_promote_replica(self):
+        self._test_volume_api('promote_replica',
+                              rpc_method='cast',
+                              volume=self.fake_volume,
+                              version='1.17')
+
+    def test_reenable_replica(self):
+        self._test_volume_api('reenable_replication',
+                              rpc_method='cast',
+                              volume=self.fake_volume,
+                              version='1.17')
index 5262a7a543f2c45c2d76d703e6074d121fe16476..a11f9d8238fdcd9d5979bc0a6d64bacc8de8f3c4 100644 (file)
@@ -31,6 +31,9 @@ def create_volume(ctxt,
                   size=1,
                   availability_zone='fake_az',
                   volume_type_id=None,
+                  replication_status='disabled',
+                  replication_extended_status=None,
+                  replication_driver_data=None,
                   **kwargs):
     """Create a volume object in the DB."""
     vol = {}
@@ -48,6 +51,10 @@ def create_volume(ctxt,
         vol['volume_type_id'] = volume_type_id
     for key in kwargs:
         vol[key] = kwargs[key]
+    vol['replication_status'] = replication_status
+    vol['replication_extended_status'] = replication_extended_status
+    vol['replication_driver_data'] = replication_driver_data
+
     return db.volume_create(ctxt, vol)
 
 
index 11ef072ee2b6a0cfa93b20e5482619d47e94a079..06cc222177e53e49304375cc03309b0255849651 100644 (file)
@@ -152,7 +152,8 @@ class API(base.Base):
     def create(self, context, size, name, description, snapshot=None,
                image_id=None, volume_type=None, metadata=None,
                availability_zone=None, source_volume=None,
-               scheduler_hints=None, backup_source_volume=None):
+               scheduler_hints=None, backup_source_volume=None,
+               source_replica=None):
 
         if source_volume and volume_type:
             if volume_type['id'] != source_volume['volume_type_id']:
@@ -161,6 +162,12 @@ class API(base.Base):
                         "You should omit the argument.")
                 raise exception.InvalidInput(reason=msg)
 
+        # When cloning replica (for testing), volume type must be omitted
+        if source_replica and volume_type:
+            msg = _("No volume_type should be provided when creating test "
+                    "replica, type must be omitted.")
+            raise exception.InvalidInput(reason=msg)
+
         if snapshot and volume_type:
             if volume_type['id'] != snapshot['volume_type_id']:
                 msg = _("Invalid volume_type provided (requested type "
@@ -190,6 +197,7 @@ class API(base.Base):
             'scheduler_hints': scheduler_hints,
             'key_manager': self.key_manager,
             'backup_source_volume': backup_source_volume,
+            'source_replica': source_replica,
             'optional_args': {'is_quota_committed': False}
         }
         try:
@@ -475,6 +483,11 @@ class API(base.Base):
             msg = _("Snapshot cannot be created while volume is migrating")
             raise exception.InvalidVolume(reason=msg)
 
+        if volume['status'].startswith('replica_'):
+            # Can't snapshot secondary replica
+            msg = _("Snapshot of secondary replica is not allowed.")
+            raise exception.InvalidVolume(reason=msg)
+
         if ((not force) and (volume['status'] != "available")):
             msg = _("must be available")
             raise exception.InvalidVolume(reason=msg)
@@ -839,6 +852,13 @@ class API(base.Base):
             LOG.error(msg)
             raise exception.InvalidVolume(reason=msg)
 
+        # We only handle non-replicated volumes for now
+        rep_status = volume['replication_status']
+        if rep_status is not None and rep_status != 'disabled':
+            msg = _("Volume must not be replicated.")
+            LOG.error(msg)
+            raise exception.InvalidVolume(reason=msg)
+
         # Make sure the host is in the list of available hosts
         elevated = context.elevated()
         topic = CONF.volume_topic
index 0085eb35084a8e92ab32a5410cbd6ae954b8b4b3..64374d361dc0252655f83cbb44d484c24f57cafb 100644 (file)
@@ -278,19 +278,57 @@ class VolumeDriver(object):
     def create_volume(self, volume):
         """Creates a volume. Can optionally return a Dictionary of
         changes to the volume object to be persisted.
+
+        If volume_type extra specs includes
+        'capabilities:replication <is> True' the driver
+        needs to create a volume replica (secondary), and setup replication
+        between the newly created volume and the secondary volume.
+        Returned dictionary should include:
+            volume['replication_status'] = 'copying'
+            volume['replication_extended_status'] = driver specific value
+            volume['driver_data'] = driver specific value
+
         """
         raise NotImplementedError()
 
     def create_volume_from_snapshot(self, volume, snapshot):
-        """Creates a volume from a snapshot."""
+        """Creates a volume from a snapshot.
+
+        If volume_type extra specs includes 'replication: <is> True'
+        the driver needs to create a volume replica (secondary),
+        and setup replication between the newly created volume and
+        the secondary volume.
+        """
+
         raise NotImplementedError()
 
     def create_cloned_volume(self, volume, src_vref):
-        """Creates a clone of the specified volume."""
+        """Creates a clone of the specified volume.
+
+        If volume_type extra specs includes 'replication: <is> True' the
+        driver needs to create a volume replica (secondary)
+        and setup replication between the newly created volume
+        and the secondary volume.
+
+        """
+
+        raise NotImplementedError()
+
+    def create_replica_test_volume(self, volume, src_vref):
+        """Creates a test replica clone of the specified replicated volume.
+
+        Create a clone of the replicated (secondary) volume.
+
+        """
         raise NotImplementedError()
 
     def delete_volume(self, volume):
-        """Deletes a volume."""
+        """Deletes a volume.
+
+        If volume_type extra specs includes 'replication: <is> True'
+        then the driver needs to delete the volume replica too.
+
+        """
         raise NotImplementedError()
 
     def create_snapshot(self, snapshot):
@@ -307,6 +345,10 @@ class VolumeDriver(object):
     def get_volume_stats(self, refresh=False):
         """Return the current state of the volume service. If 'refresh' is
            True, run the update first.
+
+           For replication the following state should be reported:
+           replication_support = True (None or false disables replication)
+
         """
         return None
 
@@ -547,7 +589,24 @@ class VolumeDriver(object):
     def retype(self, context, volume, new_type, diff, host):
         """Convert the volume to be of the new type.
 
-        Returns a boolean indicating whether the retype occurred.
+        Returns either:
+        A boolean indicating whether the retype occurred, or
+        A tuple (retyped, model_update) where retyped is a boolean
+        indicating if the retype occurred, and the model_update includes
+        changes for the volume db.
+        if diff['extra_specs'] includes 'replication' then:
+            if  ('True', _ ) then replication should be disabled:
+                Volume replica should be deleted
+                volume['replication_status'] should be changed to 'disabled'
+                volume['replication_extended_status'] = None
+                volume['replication_driver_data'] = None
+            if  (_, 'True') then replication should be enabled:
+                Volume replica (secondary) should be created, and replication
+                should be setup between the volume and the newly created
+                replica
+                volume['replication_status'] = 'copying'
+                volume['replication_extended_status'] = driver specific value
+                volume['replication_driver_data'] = driver specific value
 
         :param ctxt: Context
         :param volume: A dictionary describing the volume to migrate
@@ -557,7 +616,7 @@ class VolumeDriver(object):
                      host['host'] is its name, and host['capabilities'] is a
                      dictionary of its reported capabilities.
         """
-        return False
+        return False, None
 
     def accept_transfer(self, context, volume, new_user, new_project):
         """Accept the transfer of a volume for a new user/project."""
@@ -635,6 +694,82 @@ class VolumeDriver(object):
     def validate_connector_has_setting(connector, setting):
         pass
 
+    def reenable_replication(self, context, volume):
+        """Re-enable replication between the replica and primary volume.
+
+        This is used to re-enable/fix the replication between primary
+        and secondary. One use is as part of the fail-back process, when
+        you re-synchorize your old primary with the promoted volume
+        (the old replica).
+        Returns model_update for the volume to reflect the actions of the
+        driver.
+        The driver is expected to update the following entries:
+            'replication_status'
+            'replication_extended_status'
+            'replication_driver_data'
+        Possible 'replication_status' values (in model_update) are:
+        'error' - replication in error state
+        'copying' - replication copying data to secondary (inconsistent)
+        'active' - replication copying data to secondary (consistent)
+        'active-stopped' - replication data copy on hold (consistent)
+        'inactive' - replication data copy on hold (inconsistent)
+        Values in 'replication_extended_status' and 'replication_driver_data'
+        are managed by the driver.
+
+        :param context: Context
+        :param volume: A dictionary describing the volume
+
+        """
+        msg = _("sync_replica not implemented.")
+        raise NotImplementedError(msg)
+
+    def get_replication_status(self, context, volume):
+        """Query the actual volume replication status from the driver.
+
+        Returns model_update for the volume.
+        The driver is expected to update the following entries:
+            'replication_status'
+            'replication_extended_status'
+            'replication_driver_data'
+        Possible 'replication_status' values (in model_update) are:
+        'error' - replication in error state
+        'copying' - replication copying data to secondary (inconsistent)
+        'active' - replication copying data to secondary (consistent)
+        'active-stopped' - replication data copy on hold (consistent)
+        'inactive' - replication data copy on hold (inconsistent)
+        Values in 'replication_extended_status' and 'replication_driver_data'
+        are managed by the driver.
+
+        :param context: Context
+        :param volume: A dictionary describing the volume
+        """
+        return None
+
+    def promote_replica(self, context, volume):
+        """Promote the replica to be the primary volume.
+
+        Following this command, replication between the volumes at
+        the storage level should be stopped, the replica should be
+        available to be attached, and the replication status should
+        be in status 'inactive'.
+
+        Returns model_update for the volume.
+        The driver is expected to update the following entries:
+            'replication_status'
+            'replication_extended_status'
+            'replication_driver_data'
+        Possible 'replication_status' values (in model_update) are:
+        'error' - replication in error state
+        'inactive' - replication data copy on hold (inconsistent)
+        Values in 'replication_extended_status' and 'replication_driver_data'
+        are managed by the driver.
+
+        :param context: Context
+        :param volume: A dictionary describing the volume
+        """
+        msg = _("promote_replica not implemented.")
+        raise NotImplementedError(msg)
+
     # #######  Interface methods for DataPath (Connector) ########
     def ensure_export(self, context, volume):
         """Synchronously recreates an export for a volume."""
index f65aa1f7015d11558f214e968386d4dcb1ada73a..639d60b22c004fb3c2c058c813b724c1d2015372 100644 (file)
@@ -40,6 +40,7 @@ QUOTAS = quota.QUOTAS
 # from, 'error' being the common example.
 SNAPSHOT_PROCEED_STATUS = ('available',)
 SRC_VOL_PROCEED_STATUS = ('available', 'in-use',)
+REPLICA_PROCEED_STATUS = ('active', 'active-stopped')
 
 
 class ExtractVolumeRequestTask(flow_utils.CinderTask):
@@ -58,7 +59,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
     # reconstructed elsewhere and continued).
     default_provides = set(['availability_zone', 'size', 'snapshot_id',
                             'source_volid', 'volume_type', 'volume_type_id',
-                            'encryption_key_id'])
+                            'encryption_key_id', 'source_replicaid'])
 
     def __init__(self, image_service, availability_zones, **kwargs):
         super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
@@ -111,6 +112,38 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
             source_volid = source_volume['id']
         return source_volid
 
+    @staticmethod
+    def _extract_source_replica(source_replica):
+        """Extracts the volume id from the provided replica (if provided).
+
+        This function validates the input replica_volume dict and checks that
+        the status of that replica_volume is valid for creating a volume from.
+        """
+
+        source_replicaid = None
+        if source_replica is not None:
+            if source_replica['status'] not in SRC_VOL_PROCEED_STATUS:
+                msg = _("Unable to create a volume from an originating source"
+                        " volume when its status is not one of %s"
+                        " values")
+                msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
+                # TODO(harlowja): what happens if the status changes after this
+                # initial volume status check occurs??? Seems like someone
+                # could delete the volume after this check passes but before
+                # the volume is officially created?
+                raise exception.InvalidVolume(reason=msg)
+            replication_status = source_replica['replication_status']
+            if replication_status not in REPLICA_PROCEED_STATUS:
+                msg = _("Unable to create a volume from a replica"
+                        " when replication status is not one of %s"
+                        " values")
+                msg = msg % (", ".join(REPLICA_PROCEED_STATUS))
+                # TODO(ronenkat): what happens if the replication status
+                # changes after this initial volume status check occurs???
+                raise exception.InvalidVolume(reason=msg)
+            source_replicaid = source_replica['id']
+        return source_replicaid
+
     @staticmethod
     def _extract_size(size, source_volume, snapshot):
         """Extracts and validates the volume size.
@@ -330,7 +363,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
 
     def execute(self, context, size, snapshot, image_id, source_volume,
                 availability_zone, volume_type, metadata,
-                key_manager, backup_source_volume):
+                key_manager, backup_source_volume, source_replica):
 
         utils.check_exclusive_options(snapshot=snapshot,
                                       imageRef=image_id,
@@ -341,6 +374,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
         # volume will remain available after we do this initial verification??
         snapshot_id = self._extract_snapshot(snapshot)
         source_volid = self._extract_source_volume(source_volume)
+        source_replicaid = self._extract_source_replica(source_replica)
         size = self._extract_size(size, source_volume, snapshot)
 
         self._check_image_metadata(context, image_id, size)
@@ -354,8 +388,15 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
         # should copy encryption metadata from the encrypted volume type to the
         # volume upon creation and propagate that information to each snapshot.
         # This strategy avoid any dependency upon the encrypted volume type.
+        def_vol_type = volume_types.get_default_volume_type()
         if not volume_type and not source_volume and not snapshot:
-            volume_type = volume_types.get_default_volume_type()
+            volume_type = def_vol_type
+
+        # When creating a clone of a replica (replication test), we can't
+        # use the volume type of the replica, therefore, we use the default.
+        # NOTE(ronenkat): this assumes the default type is not replicated.
+        if source_replicaid:
+            volume_type = def_vol_type
 
         volume_type_id = self._get_volume_type_id(volume_type,
                                                   source_volume, snapshot,
@@ -387,6 +428,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
             'volume_type_id': volume_type_id,
             'encryption_key_id': encryption_key_id,
             'qos_specs': specs,
+            'source_replicaid': source_replicaid,
         }
 
 
@@ -401,7 +443,8 @@ class EntryCreateTask(flow_utils.CinderTask):
     def __init__(self, db):
         requires = ['availability_zone', 'description', 'metadata',
                     'name', 'reservations', 'size', 'snapshot_id',
-                    'source_volid', 'volume_type_id', 'encryption_key_id']
+                    'source_volid', 'volume_type_id', 'encryption_key_id',
+                    'source_replicaid']
         super(EntryCreateTask, self).__init__(addons=[ACTION],
                                               requires=requires)
         self.db = db
@@ -427,6 +470,7 @@ class EntryCreateTask(flow_utils.CinderTask):
             # Rename these to the internal name.
             'display_description': kwargs.pop('description'),
             'display_name': kwargs.pop('name'),
+            'replication_status': 'disabled',
         }
 
         # Merge in the other required arguments which should provide the rest
@@ -612,7 +656,7 @@ class VolumeCastTask(flow_utils.CinderTask):
     def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
         requires = ['image_id', 'scheduler_hints', 'snapshot_id',
                     'source_volid', 'volume_id', 'volume_type',
-                    'volume_properties']
+                    'volume_properties', 'source_replicaid']
         super(VolumeCastTask, self).__init__(addons=[ACTION],
                                              requires=requires)
         self.volume_rpcapi = volume_rpcapi
@@ -621,6 +665,7 @@ class VolumeCastTask(flow_utils.CinderTask):
 
     def _cast_create_volume(self, context, request_spec, filter_properties):
         source_volid = request_spec['source_volid']
+        source_replicaid = request_spec['source_replicaid']
         volume_id = request_spec['volume_id']
         snapshot_id = request_spec['snapshot_id']
         image_id = request_spec['image_id']
@@ -639,6 +684,9 @@ class VolumeCastTask(flow_utils.CinderTask):
         elif source_volid:
             source_volume_ref = self.db.volume_get(context, source_volid)
             host = source_volume_ref['host']
+        elif source_replicaid:
+            source_volume_ref = self.db.volume_get(context, source_replicaid)
+            host = source_volume_ref['host']
 
         if not host:
             # Cast to the scheduler and let it handle whatever is needed
@@ -666,7 +714,8 @@ class VolumeCastTask(flow_utils.CinderTask):
                 allow_reschedule=False,
                 snapshot_id=snapshot_id,
                 image_id=image_id,
-                source_volid=source_volid)
+                source_volid=source_volid,
+                source_replicaid=source_replicaid)
 
     def execute(self, context, **kwargs):
         scheduler_hints = kwargs.pop('scheduler_hints', None)
index a8cac6c447659c624c2c28889270506dea6baab9..8dc2f50fbd7a9603a63d0a992a405d06e392c2ce 100644 (file)
@@ -208,7 +208,8 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask):
     default_provides = 'volume_spec'
 
     def __init__(self, db):
-        requires = ['image_id', 'snapshot_id', 'source_volid']
+        requires = ['image_id', 'snapshot_id', 'source_volid',
+                    'source_replicaid']
         super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION],
                                                     requires=requires)
         self.db = db
@@ -254,6 +255,18 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask):
                 'source_volstatus': source_volume_ref['status'],
                 'type': 'source_vol',
             })
+        elif kwargs.get('source_replicaid'):
+            # We are making a clone based on the replica.
+            #
+            # NOTE(harlowja): This will likely fail if the replica
+            # disappeared by the time this call occurred.
+            source_volid = kwargs['source_replicaid']
+            source_volume_ref = self.db.volume_get(context, source_volid)
+            specs.update({
+                'source_replicaid': source_volid,
+                'source_replicastatus': source_volume_ref['status'],
+                'type': 'source_replica',
+            })
         elif kwargs.get('image_id'):
             # We are making an image based volume instead of a raw volume.
             image_href = kwargs['image_id']
@@ -363,6 +376,17 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
                     context,
                     source_volid,
                     volume_id)
+            elif kwargs.get('source_replicaid'):
+                src_type = 'source replica'
+                src_id = kwargs['source_replicaid']
+                source_replicaid = src_id
+                LOG.debug(log_template % {'src_type': src_type,
+                                          'src_id': src_id,
+                                          'vol_id': volume_id})
+                self.db.volume_glance_metadata_copy_from_volume_to_volume(
+                    context,
+                    source_replicaid,
+                    volume_id)
             elif kwargs.get('image_id'):
                 src_type = 'image'
                 src_id = kwargs['image_id']
@@ -432,6 +456,27 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
                                                      source_volid=source_volid)
         return model_update
 
+    def _create_from_source_replica(self, context, volume_ref,
+                                    source_replicaid, **kwargs):
+        # NOTE(harlowja): if the source volume has disappeared this will be our
+        # detection of that since this database call should fail.
+        #
+        # NOTE(harlowja): likely this is not the best place for this to happen
+        # and we should have proper locks on the source volume while actions
+        # that use the source volume are underway.
+        srcvol_ref = self.db.volume_get(context, source_replicaid)
+        model_update = self.driver.create_replica_test_volume(volume_ref,
+                                                              srcvol_ref)
+        # NOTE(harlowja): Subtasks would be useful here since after this
+        # point the volume has already been created and further failures
+        # will not destroy the volume (although they could in the future).
+        if srcvol_ref.bootable:
+            self._handle_bootable_volume_glance_meta(
+                context,
+                volume_ref['id'],
+                source_replicaid=source_replicaid)
+        return model_update
+
     def _copy_image_to_volume(self, context, volume_ref,
                               image_id, image_location, image_service):
         """Downloads Glance image to the specified volume."""
@@ -588,6 +633,9 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
         elif create_type == 'source_vol':
             model_update = self._create_from_source_volume(
                 context, volume_ref=volume_ref, **volume_spec)
+        elif create_type == 'source_replica':
+            model_update = self._create_from_source_replica(
+                context, volume_ref=volume_ref, **volume_spec)
         elif create_type == 'image':
             model_update = self._create_from_image(context,
                                                    volume_ref=volume_ref,
@@ -661,7 +709,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
 def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
              allow_reschedule, reschedule_context, request_spec,
              filter_properties, snapshot_id=None, image_id=None,
-             source_volid=None):
+             source_volid=None, source_replicaid=None):
     """Constructs and returns the manager entrypoint flow.
 
     This flow will do the following:
@@ -691,6 +739,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
         'snapshot_id': snapshot_id,
         'source_volid': source_volid,
         'volume_id': volume_id,
+        'source_replicaid': source_replicaid,
     }
 
     volume_flow.add(ExtractVolumeRefTask(db, host))
index 6251fc22c0c2370b6c609463a3fb6f0af5feeb66..d81f90acf8d633348b10c8d034a8bbcf692ba473 100644 (file)
@@ -151,7 +151,7 @@ def locked_snapshot_operation(f):
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
 
-    RPC_API_VERSION = '1.16'
+    RPC_API_VERSION = '1.17'
 
     target = messaging.Target(version=RPC_API_VERSION)
 
@@ -273,7 +273,8 @@ class VolumeManager(manager.SchedulerDependentManager):
 
     def create_volume(self, context, volume_id, request_spec=None,
                       filter_properties=None, allow_reschedule=True,
-                      snapshot_id=None, image_id=None, source_volid=None):
+                      snapshot_id=None, image_id=None, source_volid=None,
+                      source_replicaid=None):
 
         """Creates the volume."""
         context_saved = context.deepcopy()
@@ -294,6 +295,7 @@ class VolumeManager(manager.SchedulerDependentManager):
                 snapshot_id=snapshot_id,
                 image_id=image_id,
                 source_volid=source_volid,
+                source_replicaid=source_replicaid,
                 allow_reschedule=allow_reschedule,
                 reschedule_context=context_saved,
                 request_spec=request_spec,
@@ -301,7 +303,7 @@ class VolumeManager(manager.SchedulerDependentManager):
         except Exception:
             LOG.exception(_("Failed to create manager volume flow"))
             raise exception.CinderException(
-                _("Failed to create manager volume flow"))
+                _("Failed to create manager volume flow."))
 
         if snapshot_id is not None:
             # Make sure the snapshot is not deleted until we are done with it.
@@ -309,6 +311,9 @@ class VolumeManager(manager.SchedulerDependentManager):
         elif source_volid is not None:
             # Make sure the volume is not deleted until we are done with it.
             locked_action = "%s-%s" % (source_volid, 'delete_volume')
+        elif source_replicaid is not None:
+            # Make sure the volume is not deleted until we are done with it.
+            locked_action = "%s-%s" % (source_replicaid, 'delete_volume')
         else:
             locked_action = None
 
@@ -1263,11 +1268,22 @@ class VolumeManager(manager.SchedulerDependentManager):
             retyped = True
 
         # Call driver to try and change the type
+        retype_model_update = None
         if not retyped:
             try:
                 new_type = volume_types.get_volume_type(context, new_type_id)
-                retyped = self.driver.retype(context, volume_ref, new_type,
-                                             diff, host)
+                ret = self.driver.retype(context,
+                                         volume_ref,
+                                         new_type,
+                                         diff,
+                                         host)
+                # Check if the driver retype provided a model update or
+                # just a retype indication
+                if type(ret) == tuple:
+                    retyped, retype_model_update = ret
+                else:
+                    retyped = ret
+
                 if retyped:
                     LOG.info(_("Volume %s: retyped successfully"), volume_id)
             except Exception as ex:
@@ -1294,6 +1310,16 @@ class VolumeManager(manager.SchedulerDependentManager):
                 msg = _("Volume must not have snapshots.")
                 LOG.error(msg)
                 raise exception.InvalidVolume(reason=msg)
+
+            # Don't allow volume with replicas to be migrated
+            rep_status = volume_ref['replication_status']
+            if rep_status is not None and rep_status != 'disabled':
+                _retype_error(context, volume_id, old_reservations,
+                              new_reservations, status_update)
+                msg = _("Volume must not be replicated.")
+                LOG.error(msg)
+                raise exception.InvalidVolume(reason=msg)
+
             self.db.volume_update(context, volume_ref['id'],
                                   {'migration_status': 'starting'})
 
@@ -1305,10 +1331,12 @@ class VolumeManager(manager.SchedulerDependentManager):
                     _retype_error(context, volume_id, old_reservations,
                                   new_reservations, status_update)
         else:
-            self.db.volume_update(context, volume_id,
-                                  {'volume_type_id': new_type_id,
-                                   'host': host['host'],
-                                   'status': status_update['status']})
+            model_update = {'volume_type_id': new_type_id,
+                            'host': host['host'],
+                            'status': status_update['status']}
+            if retype_model_update:
+                model_update.update(retype_model_update)
+            self.db.volume_update(context, volume_id, model_update)
 
         if old_reservations:
             QUOTAS.commit(context, old_reservations, project_id=project_id)
@@ -1317,7 +1345,7 @@ class VolumeManager(manager.SchedulerDependentManager):
         self.publish_service_capabilities(context)
 
     def manage_existing(self, ctxt, volume_id, ref=None):
-        LOG.debug('manage_existing: managing %s' % ref)
+        LOG.debug('manage_existing: managing %s.' % ref)
         try:
             flow_engine = manage_existing.get_flow(
                 ctxt,
@@ -1339,3 +1367,96 @@ class VolumeManager(manager.SchedulerDependentManager):
         # Update volume stats
         self.stats['allocated_capacity_gb'] += volume_ref['size']
         return volume_ref['id']
+
+    def promote_replica(self, ctxt, volume_id):
+        """Promote volume replica secondary to be the primary volume."""
+        try:
+            utils.require_driver_initialized(self.driver)
+        except exception.DriverNotInitialized:
+            with excutils.save_and_reraise_exception():
+                LOG.exception(_("Failed to promote replica for volume %(id)s.")
+                              % {'id': volume_id})
+
+        volume = self.db.volume_get(ctxt, volume_id)
+        model_update = None
+        try:
+            LOG.debug("Volume %s: promote replica.", volume_id)
+            model_update = self.driver.promote_replica(ctxt, volume)
+        except exception.CinderException:
+            err_msg = (_('Error promoting secondary volume to primary'))
+            raise exception.ReplicationError(reason=err_msg,
+                                             volume_id=volume_id)
+
+        try:
+            if model_update:
+                volume = self.db.volume_update(ctxt,
+                                               volume_id,
+                                               model_update)
+        except exception.CinderException:
+            err_msg = (_("Failed updating model"
+                         " with driver provided model %(model)s") %
+                       {'model': model_update})
+            raise exception.ReplicationError(reason=err_msg,
+                                             volume_id=volume_id)
+
+    def reenable_replication(self, ctxt, volume_id):
+        """Re-enable replication of secondary volume with primary volumes."""
+        try:
+            utils.require_driver_initialized(self.driver)
+        except exception.DriverNotInitialized:
+            with excutils.save_and_reraise_exception():
+                LOG.exception(_("Failed to sync replica for volume %(id)s.")
+                              % {'id': volume_id})
+
+        volume = self.db.volume_get(ctxt, volume_id)
+        model_update = None
+        try:
+            LOG.debug("Volume %s: sync replica.", volume_id)
+            model_update = self.driver.reenable_replication(ctxt, volume)
+        except exception.CinderException:
+            err_msg = (_('Error synchronizing secondary volume to primary'))
+            raise exception.ReplicationError(reason=err_msg,
+                                             volume_id=volume_id)
+
+        try:
+            if model_update:
+                volume = self.db.volume_update(ctxt,
+                                               volume_id,
+                                               model_update)
+        except exception.CinderException:
+            err_msg = (_("Failed updating model"
+                         " with driver provided model %(model)s") %
+                       {'model': model_update})
+            raise exception.ReplicationError(reason=err_msg,
+                                             volume_id=volume_id)
+
+    @periodic_task.periodic_task
+    def _update_replication_relationship_status(self, ctxt):
+        LOG.info(_('Updating volume replication status.'))
+        if not self.driver.initialized:
+            if self.driver.configuration.config_group is None:
+                config_group = ''
+            else:
+                config_group = ('(config name %s)' %
+                                self.driver.configuration.config_group)
+
+            LOG.warning(_('Unable to update volume replication status, '
+                          '%(driver_name)s -%(driver_version)s '
+                          '%(config_group)s driver is uninitialized.') %
+                        {'driver_name': self.driver.__class__.__name__,
+                         'driver_version': self.driver.get_version(),
+                         'config_group': config_group})
+        else:
+            volumes = self.db.volume_get_all_by_host(ctxt, self.host)
+            for vol in volumes:
+                model_update = None
+                try:
+                    model_update = self.driver.get_replication_status(
+                        ctxt, vol)
+                    if model_update:
+                        self.db.volume_update(ctxt,
+                                              vol['id'],
+                                              model_update)
+                except Exception:
+                    LOG.exception(_("Error checking replication status for "
+                                    "volume %s") % vol['id'])
index 5d251c9fd03d3cbd7c6384d088982cfa93c90e4b..63c10ed986a6f26511ddd6e60483c9946806a880 100644 (file)
@@ -51,6 +51,8 @@ class VolumeAPI(object):
         1.14 - Adds reservation parameter to extend_volume().
         1.15 - Adds manage_existing and unmanage_only flag to delete_volume.
         1.16 - Removes create_export.
+        1.17 - Add replica option to create_volume, promote_replica and
+               sync_replica.
     '''
 
     BASE_RPC_API_VERSION = '1.0'
@@ -59,12 +61,13 @@ class VolumeAPI(object):
         super(VolumeAPI, self).__init__()
         target = messaging.Target(topic=CONF.volume_topic,
                                   version=self.BASE_RPC_API_VERSION)
-        self.client = rpc.get_client(target, '1.15')
+        self.client = rpc.get_client(target, '1.17')
 
     def create_volume(self, ctxt, volume, host,
                       request_spec, filter_properties,
                       allow_reschedule=True,
                       snapshot_id=None, image_id=None,
+                      source_replicaid=None,
                       source_volid=None):
 
         cctxt = self.client.prepare(server=host, version='1.4')
@@ -76,6 +79,7 @@ class VolumeAPI(object):
                    allow_reschedule=allow_reschedule,
                    snapshot_id=snapshot_id,
                    image_id=image_id,
+                   source_replicaid=source_replicaid,
                    source_volid=source_volid),
 
     def delete_volume(self, ctxt, volume, unmanage_only=False):
@@ -165,3 +169,11 @@ class VolumeAPI(object):
     def manage_existing(self, ctxt, volume, ref):
         cctxt = self.client.prepare(server=volume['host'], version='1.15')
         cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
+
+    def promote_replica(self, ctxt, volume):
+        cctxt = self.client.prepare(server=volume['host'], version='1.17')
+        cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id'])
+
+    def reenable_replication(self, ctxt, volume):
+        cctxt = self.client.prepare(server=volume['host'], version='1.17')
+        cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id'])
index 7395965a051a660fa2de2f20e3586515348ad84d..37c17db2e8e5362bc200667c7f9c2a0966b28b42 100644 (file)
@@ -54,7 +54,13 @@ def _usage_from_volume(context, volume_ref, **kw):
                       created_at=null_safe_str(volume_ref['created_at']),
                       status=volume_ref['status'],
                       snapshot_id=volume_ref['snapshot_id'],
-                      size=volume_ref['size'])
+                      size=volume_ref['size'],
+                      replication_status=volume_ref['replication_status'],
+                      replication_extended_status=
+                      volume_ref['replication_extended_status'],
+                      replication_driver_data=
+                      volume_ref['replication_driver_data'],
+                      )
 
     usage_info.update(kw)
     return usage_info
@@ -107,6 +113,40 @@ def notify_about_snapshot_usage(context, snapshot, event_suffix,
                                             usage_info)
 
 
+def notify_about_replication_usage(context, volume, suffix,
+                                   extra_usage_info=None, host=None):
+    if not host:
+        host = CONF.host
+
+    if not extra_usage_info:
+        extra_usage_info = {}
+
+    usage_info = _usage_from_volume(context,
+                                    volume,
+                                    **extra_usage_info)
+
+    rpc.get_notifier('replication', host).info(context,
+                                               'replication.%s' % suffix,
+                                               usage_info)
+
+
+def notify_about_replication_error(context, volume, suffix,
+                                   extra_error_info=None, host=None):
+    if not host:
+        host = CONF.host
+
+    if not extra_error_info:
+        extra_error_info = {}
+
+    usage_info = _usage_from_volume(context,
+                                    volume,
+                                    **extra_error_info)
+
+    rpc.get_notifier('replication', host).error(context,
+                                                'replication.%s' % suffix,
+                                                usage_info)
+
+
 def setup_blkio_cgroup(srcpath, dstpath, bps_limit, execute=utils.execute):
     if not bps_limit:
         LOG.debug('Not using bps rate limiting on volume copy')
index 7cc9b416843cc346e070fab74ad9bc5daddc844c..5c06c4eff2cbb20b1081c5e9623464a6e246b4b0 100644 (file)
 # value)
 #transfer_api_class=cinder.transfer.api.API
 
+# The full class name of the volume replication API class
+# (string value)
+#replication_api_class=cinder.replication.api.API
+
 
 #
 # Options defined in cinder.compute
index 73b3bc4232ea95863704c893d1da13a84e64cc00..3ec7fb48401c29128597a60850d5653a8d7ca880 100644 (file)
@@ -52,6 +52,9 @@
     "volume:delete_transfer": [],
     "volume:get_all_transfers": [],
 
+    "volume_extension:replication:promote": ["rule:admin_api"],
+    "volume_extension:replication:reenable": ["rule:admin_api"],
+
     "backup:create" : [],
     "backup:delete": [],
     "backup:get": [],