# Do nothing
self.driver.create_export(None, volume)
self.driver.remove_export(None, volume)
- self.assertRaises(NotImplementedError,
- self.driver.check_for_export, None, volume["id"])
# Make sure volume attributes are as they should be
attributes = self.driver._get_volume_attributes(volume["name"])
return volume_id_list
- def test_check_for_export_with_no_volume(self):
- instance_uuid = '12345678-1234-5678-1234-567812345678'
- self.volume.check_for_export(self.context, instance_uuid)
-
class VolumePolicyTestCase(test.TestCase):
self.driver.create_export(context, volume)
self.driver.ensure_export(context, volume)
self.driver.remove_export(context, volume)
- self.driver.check_for_export(context, 0)
self.assertRaises(NotImplementedError,
self.driver.create_volume_from_snapshot,
"""Removes an export for a logical volume."""
raise NotImplementedError()
- def check_for_export(self, context, volume_id):
- """Make sure volume is exported."""
- raise NotImplementedError()
-
def initialize_connection(self, volume, connector):
"""Allow connection to connector and return connection info."""
raise NotImplementedError()
def terminate_connection(self, volume, connector):
pass
- def check_for_export(self, context, volume_id):
- """Make sure volume is exported."""
- vol_uuid_file = 'volume-%s' % volume_id
- volume_path = os.path.join(FLAGS.volumes_dir, vol_uuid_file)
- if os.path.isfile(volume_path):
- iqn = '%s%s' % (FLAGS.iscsi_target_prefix,
- vol_uuid_file)
- else:
- raise exception.PersistentVolumeFileNotFound(volume_id=volume_id)
-
- # TODO(jdg): In the future move all of the dependent stuff into the
- # cooresponding target admin class
- if not isinstance(self.tgtadm, iscsi.TgtAdm):
- tid = self.db.volume_get_iscsi_target_num(context, volume_id)
- else:
- tid = 0
-
- try:
- self.tgtadm.show_target(tid, iqn=iqn)
- except exception.ProcessExecutionError, e:
- # Instances remount read-only in this case.
- # /etc/init.d/iscsitarget restart and rebooting cinder-volume
- # is better since ensure_export() works at boot time.
- LOG.error(_("Cannot confirm exported volume "
- "id:%(volume_id)s.") % locals())
- raise
-
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Fetch the image from image_service and write it to the volume."""
volume_path = self.local_path(volume)
"""Removes an export for a logical volume"""
pass
- def check_for_export(self, context, volume_id):
- """Make sure volume is exported."""
- pass
-
def initialize_connection(self, volume, connector):
return {
'driver_volume_type': 'rbd',
"""Removes an export for a logical volume"""
pass
- def check_for_export(self, context, volume_id):
- """Make sure volume is exported."""
- pass
-
def initialize_connection(self, volume, connector):
return {
'driver_volume_type': 'sheepdog',
def terminate_connection(self, volume, connector):
self.log_action('terminate_connection', volume)
- def check_for_export(self, context, volume_id):
- self.log_action('check_for_export', volume_id)
-
_LOGS = []
@staticmethod
volume_ref = self.db.volume_get(context, volume_id)
self.driver.terminate_connection(volume_ref, connector)
- def check_for_export(self, context, instance_uuid):
- """Make sure whether volume is exported."""
- volumes = self.db.volume_get_all_by_instance_uuid(context,
- instance_uuid)
- for volume in volumes:
- self.driver.check_for_export(context, volume['id'])
-
def _volume_stats_changed(self, stat1, stat2):
if FLAGS.volume_force_update_capabilities:
return True
self._refresh_dfm_luns(lun.HostId)
self._discover_dataset_luns(dataset, clone_name)
- def check_for_export(self, context, volume_id):
- raise NotImplementedError()
-
class NetAppLun(object):
"""Represents a LUN on NetApp storage."""
extra_args['SpaceReserved'] = True
self._clone_lun(lun.handle, new_name, extra_args)
- def check_for_export(self, context, volume_id):
- raise NotImplementedError()
-
def _get_qos_type(self, volume):
"""Get the storage service type for a volume."""
type_id = volume['volume_type_id']
"""Removes an export for a logical volume."""
pass
- def check_for_export(self, context, volume_id):
- """Make sure volume is exported."""
- pass
-
def initialize_connection(self, volume, connector):
"""Allow connection to connector and return connection info."""
data = {'export': volume['provider_location'],
def remove_export(self, context, volume):
pass
- def check_for_export(self, context, volume_id):
- raise NotImplementedError()
-
def initialize_connection(self, volume, connector):
"""Perform the necessary work so that an iSCSI connection can be made.
"""Irrelevant for VPSA volumes. Export removed during detach."""
pass
- def check_for_export(self, context, volume_id):
- """Irrelevant for VPSA volumes. Export created during attachment."""
- pass
-
def initialize_connection(self, volume, connector):
"""
Attach volume to initiator/host.