volume_api.enable_replication,
ctxt, volume)
+ def test_enable_replication_invalid_type(self):
+ volume_api = cinder.volume.api.API()
+ ctxt = context.get_admin_context()
+
+ volume = tests_utils.create_volume(self.context,
+ size=1,
+ host=CONF.host,
+ replication_status='disabled')
+ volume['volume_type_id'] = 'dab02f01-b50f-4ed6-8d42-2b5b9680996e'
+ fake_specs = {}
+ with mock.patch.object(volume_types,
+ 'get_volume_type_extra_specs',
+ return_value = fake_specs):
+ self.assertRaises(exception.InvalidVolume,
+ volume_api.enable_replication,
+ ctxt,
+ volume)
+
def test_enable_replication(self):
volume_api = cinder.volume.api.API()
ctxt = context.get_admin_context()
size=1,
host=CONF.host,
replication_status='disabled')
+ volume['volume_type_id'] = 'dab02f01-b50f-4ed6-8d42-2b5b9680996e'
+ fake_specs = {'replication_enabled': '<is> True'}
with mock.patch.object(volume_rpcapi.VolumeAPI,
- 'enable_replication') as mock_enable_rep:
+ 'enable_replication') as mock_enable_rep,\
+ mock.patch.object(volume_types,
+ 'get_volume_type_extra_specs',
+ return_value = fake_specs):
+
volume_api.enable_replication(ctxt, volume)
self.assertTrue(mock_enable_rep.called)
host=CONF.host,
replication_status='disabled')
+ volume['volume_type_id'] = 'dab02f01-b50f-4ed6-8d42-2b5b9680996e'
+ fake_specs = {'replication_enabled': '<is> True'}
with mock.patch.object(volume_rpcapi.VolumeAPI,
- 'disable_replication') as mock_disable_rep:
+ 'disable_replication') as mock_disable_rep,\
+ mock.patch.object(volume_types,
+ 'get_volume_type_extra_specs',
+ return_value = fake_specs):
volume_api.disable_replication(ctxt, volume)
self.assertTrue(mock_disable_rep.called)
cinder.policy.enforce(context, _action, target)
+def valid_replication_volume(func):
+ """Check that the volume is capable of replication.
+
+ This decorator requires the first 3 args of the wrapped function
+ to be (self, context, volume)
+ """
+ @functools.wraps(func)
+ def wrapped(self, context, volume, *args, **kwargs):
+ rep_capable = False
+ if volume.get('volume_type_id', None):
+ extra_specs = volume_types.get_volume_type_extra_specs(
+ volume.get('volume_type_id'))
+ rep_capable = extra_specs.get('replication_enabled',
+ False) == "<is> True"
+ if not rep_capable:
+ msg = _("Volume is not a replication enabled volume, "
+ "replication operations can only be performed "
+ "on volumes that are of type replication_enabled.")
+ raise exception.InvalidVolume(reason=msg)
+ return func(self, context, volume, *args, **kwargs)
+ return wrapped
+
+
class API(base.Base):
"""API for interacting with the volume manager."""
# now they're a special resource, so no.
@wrap_check_policy
+ @valid_replication_volume
def enable_replication(self, ctxt, volume):
-
# NOTE(jdg): details like sync vs async
# and replica count are to be set via the
# volume-type and config files.
self.volume_rpcapi.enable_replication(ctxt, vref)
@wrap_check_policy
+ @valid_replication_volume
def disable_replication(self, ctxt, volume):
valid_disable_status = ['disabled', 'enabled']
self.volume_rpcapi.disable_replication(ctxt, vref)
@wrap_check_policy
+ @valid_replication_volume
def failover_replication(self,
ctxt,
volume,
secondary)
@wrap_check_policy
+ @valid_replication_volume
def list_replication_targets(self, ctxt, volume):
# NOTE(jdg): This collects info for the specified volume