image_meta = FakeObject()
image_meta['disk_format'] = 'vmdk'
image_meta['size'] = 1 * units.MiB
+ image_meta['properties'] = {'vmware_disktype': 'preallocated'}
image_service = m.CreateMock(glance.GlanceImageService)
image_service.show(mox.IgnoreArg(), image_id).AndReturn(image_meta)
volume = FakeObject()
client.options.transport.cookiejar = cookies
m.StubOutWithMock(self._vim.__class__, 'client')
self._vim.client = client
- m.StubOutWithMock(vmware_images, 'fetch_image')
+ m.StubOutWithMock(vmware_images, 'fetch_flat_image')
timeout = self._config.vmware_image_transfer_timeout_secs
- vmware_images.fetch_image(mox.IgnoreArg(), timeout, image_service,
- image_id, host=self.IP,
- data_center_name=datacenter_name,
- datastore_name=datastore_name,
- cookies=cookies,
- file_path=flat_vmdk_path)
+ vmware_images.fetch_flat_image(mox.IgnoreArg(), timeout, image_service,
+ image_id, image_size=image_meta['size'],
+ host=self.IP,
+ data_center_name=datacenter_name,
+ datastore_name=datastore_name,
+ cookies=cookies,
+ file_path=flat_vmdk_path)
+
+ m.ReplayAll()
+ self._driver.copy_image_to_volume(mox.IgnoreArg(), volume,
+ image_service, image_id)
+ m.UnsetStubs()
+ m.VerifyAll()
+
+ def test_copy_image_to_volume_stream_optimized(self):
+ """Test copy_image_to_volume.
+
+ Test with an acceptable vmdk disk format and streamOptimized disk type.
+ """
+ m = self.mox
+ m.StubOutWithMock(self._driver.__class__, 'session')
+ self._driver.session = self._session
+ m.StubOutWithMock(api.VMwareAPISession, 'vim')
+ self._session.vim = self._vim
+ m.StubOutWithMock(self._driver.__class__, 'volumeops')
+ self._driver.volumeops = self._volumeops
+
+ image_id = 'image-id'
+ size = 5 * units.GiB
+ size_kb = float(size) / units.KiB
+ size_gb = float(size) / units.GiB
+ # image_service.show call
+ image_meta = FakeObject()
+ image_meta['disk_format'] = 'vmdk'
+ image_meta['size'] = size
+ image_meta['properties'] = {'vmware_disktype': 'streamOptimized'}
+ image_service = m.CreateMock(glance.GlanceImageService)
+ image_service.show(mox.IgnoreArg(), image_id).AndReturn(image_meta)
+ # _select_ds_for_volume call
+ (host, rp, folder, summary) = (FakeObject(), FakeObject(),
+ FakeObject(), FakeObject())
+ summary.name = "datastore-1"
+ m.StubOutWithMock(self._driver, '_select_ds_for_volume')
+ self._driver._select_ds_for_volume(size_gb).AndReturn((host, rp,
+ folder,
+ summary))
+ # _get_disk_type call
+ vol_name = 'volume name'
+ volume = FakeObject()
+ volume['name'] = vol_name
+ volume['size'] = size_gb
+ volume['volume_type_id'] = None # _get_disk_type will return 'thin'
+ disk_type = 'thin'
+ # _get_create_spec call
+ m.StubOutWithMock(self._volumeops, '_get_create_spec')
+ self._volumeops._get_create_spec(vol_name, 0, disk_type,
+ summary.name)
+
+ # vim.client.factory.create call
+ class FakeFactory(object):
+ def create(self, name):
+ return mox.MockAnything()
+
+ client = FakeObject()
+ client.factory = FakeFactory()
+ m.StubOutWithMock(self._vim.__class__, 'client')
+ self._vim.client = client
+ # fetch_stream_optimized_image call
+ timeout = self._config.vmware_image_transfer_timeout_secs
+ m.StubOutWithMock(vmware_images, 'fetch_stream_optimized_image')
+ vmware_images.fetch_stream_optimized_image(mox.IgnoreArg(), timeout,
+ image_service, image_id,
+ session=self._session,
+ host=self.IP,
+ resource_pool=rp,
+ vm_folder=folder,
+ vm_create_spec=
+ mox.IgnoreArg(),
+ image_size=size)
m.ReplayAll()
self._driver.copy_image_to_volume(mox.IgnoreArg(), volume,
project_id = 'project-owner-id-123'
volume = FakeObject()
volume['name'] = vol_name
+ size_gb = 5
+ size = size_gb * units.GiB
+ volume['size'] = size_gb
volume['project_id'] = project_id
volume['instance_uuid'] = None
volume['attached_host'] = None
vmdk_file_path = '[%s] %s' % (datastore_name, file_path)
m.StubOutWithMock(self._volumeops, 'get_vmdk_path')
self._volumeops.get_vmdk_path(backing).AndReturn(vmdk_file_path)
- tmp_vmdk = '[datastore1] %s.vmdk' % image_id
- # volumeops.get_host
- host = FakeMor('Host', 'my_host')
- m.StubOutWithMock(self._volumeops, 'get_host')
- self._volumeops.get_host(backing).AndReturn(host)
- # volumeops.get_dc
- datacenter_name = 'my_datacenter'
- datacenter = FakeMor('Datacenter', datacenter_name)
- m.StubOutWithMock(self._volumeops, 'get_dc')
- self._volumeops.get_dc(host).AndReturn(datacenter)
- # volumeops.copy_vmdk_file
- m.StubOutWithMock(self._volumeops, 'copy_vmdk_file')
- self._volumeops.copy_vmdk_file(datacenter, vmdk_file_path, tmp_vmdk)
- # host_ip
- host_ip = self.IP
- # volumeops.get_entity_name
- m.StubOutWithMock(self._volumeops, 'get_entity_name')
- self._volumeops.get_entity_name(datacenter).AndReturn(datacenter_name)
- # cookiejar
- client = FakeObject()
- client.options = FakeObject()
- client.options.transport = FakeObject()
- cookies = FakeObject()
- client.options.transport.cookiejar = cookies
- m.StubOutWithMock(self._vim.__class__, 'client')
- self._vim.client = client
- # flat_vmdk
- flat_vmdk_file = '%s-flat.vmdk' % image_id
# vmware_images.upload_image
timeout = self._config.vmware_image_transfer_timeout_secs
+ host_ip = self.IP
m.StubOutWithMock(vmware_images, 'upload_image')
vmware_images.upload_image(mox.IgnoreArg(), timeout, image_service,
- image_id, project_id, host=host_ip,
- data_center_name=datacenter_name,
- datastore_name=datastore_name,
- cookies=cookies,
- file_path=flat_vmdk_file,
- snapshot_name=image_meta['name'],
+ image_id, project_id, session=self._session,
+ host=host_ip, vm=backing,
+ vmdk_file_path=vmdk_file_path,
+ vmdk_size=size,
+ image_name=image_id,
image_version=1)
- # volumeops.delete_vmdk_file
- m.StubOutWithMock(self._volumeops, 'delete_vmdk_file')
- self._volumeops.delete_vmdk_file(tmp_vmdk, datacenter)
m.ReplayAll()
self._driver.copy_volume_to_image(mox.IgnoreArg(), volume,
LOG.exception(_("Task: %(task)s failed with error: %(err)s.") %
{'task': task, 'err': excep})
done.send_exception(excep)
+
+ def wait_for_lease_ready(self, lease):
+ done = event.Event()
+ loop = loopingcall.FixedIntervalLoopingCall(self._poll_lease,
+ lease,
+ done)
+ loop.start(self._task_poll_interval)
+ done.wait()
+ loop.stop()
+
+ def _poll_lease(self, lease, done):
+ try:
+ state = self.invoke_api(vim_util, 'get_object_property',
+ self.vim, lease, 'state')
+ if state == 'ready':
+ # done
+ LOG.debug(_("Lease is ready."))
+ done.send()
+ return
+ elif state == 'initializing':
+ LOG.debug(_("Lease initializing..."))
+ return
+ elif state == 'error':
+ error_msg = self.invoke_api(vim_util, 'get_object_property',
+ self.vim, lease, 'error')
+ LOG.exception(error_msg)
+ excep = error_util.VimFaultException([], error_msg)
+ done.send_exception(excep)
+ else:
+ # unknown state - complain
+ error_msg = _("Error: unknown lease state %s.") % state
+ raise error_util.VimFaultException([], error_msg)
+ except Exception as excep:
+ LOG.exception(excep)
+ done.send_exception(excep)
"""The pipe to hold the data which the reader writes to and the writer
reads from.
"""
- def __init__(self, maxsize, transfer_size):
+ def __init__(self, maxsize, max_transfer_size):
queue.LightQueue.__init__(self, maxsize)
- self.transfer_size = transfer_size
+ self.max_transfer_size = max_transfer_size
self.transferred = 0
def read(self, chunk_size):
"""Read data from the pipe.
- Chunksize if ignored for we have ensured that the data chunks written
+ Chunksize is ignored for we have ensured that the data chunks written
to the pipe by readers is the same as the chunks asked for by Writer.
"""
- if self.transferred < self.transfer_size:
+ if self.transferred < self.max_transfer_size:
data_item = self.get()
self.transferred += len(data_item)
+ LOG.debug(_("Read %(bytes)s out of %(max)s from ThreadSafePipe.") %
+ {'bytes': self.transferred,
+ 'max': self.max_transfer_size})
return data_item
else:
+ LOG.debug(_("Completed transfer of size %s.") % self.transferred)
return ""
def write(self, data):
def tell(self):
"""Get size of the file to be read."""
- return self.transfer_size
+ return self.max_transfer_size
def close(self):
"""A place-holder to maintain consistency."""
it is in correct ('active')state.
"""
- def __init__(self, context, input, image_service, image_id,
+ def __init__(self, context, input_file, image_service, image_id,
image_meta=None):
if not image_meta:
image_meta = {}
self.context = context
- self.input = input
+ self.input_file = input_file
self.image_service = image_service
self.image_id = image_id
self.image_meta = image_meta
Function to do the image data transfer through an update
and thereon checks if the state is 'active'.
"""
+ LOG.debug(_("Initiating image service update on image: %(image)s "
+ "with meta: %(meta)s") % {'image': self.image_id,
+ 'meta': self.image_meta})
self.image_service.update(self.context,
self.image_id,
self.image_meta,
- data=self.input)
+ data=self.input_file)
self._running = True
while self._running:
try:
image_status = image_meta.get('status')
if image_status == 'active':
self.stop()
+ LOG.debug(_("Glance image: %s is now active.") %
+ self.image_id)
self.done.send(True)
# If the state is killed, then raise an exception.
elif image_status == 'killed':
output file till the transfer is completely done.
"""
- def __init__(self, input, output):
- self.input = input
- self.output = output
+ def __init__(self, input_file, output_file):
+ self.input_file = input_file
+ self.output_file = output_file
self._running = False
self.got_exception = False
self.done = event.Event()
def _inner():
- """Read data from the input and write the same to the output."""
+ """Read data from input and write the same to output."""
self._running = True
while self._running:
try:
- data = self.input.read(None)
+ data = self.input_file.read(None)
if not data:
self.stop()
self.done.send(True)
- self.output.write(data)
+ self.output_file.write(data)
+ if hasattr(self.input_file, "update_progress"):
+ self.input_file.update_progress()
+ if hasattr(self.output_file, "update_progress"):
+ self.output_file.update_progress()
greenthread.sleep(IO_THREAD_SLEEP_TIME)
except Exception as exc:
self.stop()
import urlparse
from cinder.openstack.common import log as logging
+from cinder.volume.drivers.vmware import error_util
+from cinder.volume.drivers.vmware import vim_util
LOG = logging.getLogger(__name__)
USER_AGENT = 'OpenStack-ESX-Adapter'
class VMwareHTTPFile(object):
- """Base class for HTTP file."""
+ """Base class for VMDK file access over HTTP."""
def __init__(self, file_handle):
self.eof = False
self.file_handle = file_handle
- def set_eof(self, eof):
- """Set the end of file marker."""
- self.eof = eof
-
- def get_eof(self):
- """Check if the end of file has been reached."""
- return self.eof
-
def close(self):
"""Close the file handle."""
try:
return '%s://[%s]' % (scheme, host)
return '%s://%s' % (scheme, host)
+ def _fix_esx_url(self, url, host):
+ """Fix netloc if it is a ESX host.
+
+ For a ESX host the netloc is set to '*' in the url returned in
+ HttpNfcLeaseInfo. The netloc is right IP when talking to a VC.
+ """
+ urlp = urlparse.urlparse(url)
+ if urlp.netloc == '*':
+ scheme, _, path, params, query, fragment = urlp
+ url = urlparse.urlunparse((scheme, host, path, params,
+ query, fragment))
+ return url
+
+ def find_vmdk_url(self, lease_info, host):
+ """Find the URL corresponding to a vmdk disk in lease info."""
+ url = None
+ for deviceUrl in lease_info.deviceUrl:
+ if deviceUrl.disk:
+ url = self._fix_esx_url(deviceUrl.url, host)
+ break
+ return url
+
class VMwareHTTPWriteFile(VMwareHTTPFile):
"""VMware file write handler class."""
super(VMwareHTTPWriteFile, self).close()
-class VMwareHTTPReadFile(VMwareHTTPFile):
- """VMware file read handler class."""
+class VMwareHTTPWriteVmdk(VMwareHTTPFile):
+ """Write VMDK over HTTP using VMware HttpNfcLease."""
- def __init__(self, host, data_center_name, datastore_name, cookies,
- file_path, scheme='https'):
- soap_url = self.get_soap_url(scheme, host)
- base_url = '%s/folder/%s' % (soap_url, urllib.pathname2url(file_path))
- param_list = {'dcPath': data_center_name, 'dsName': datastore_name}
- base_url = base_url + '?' + urllib.urlencode(param_list)
+ def __init__(self, session, host, rp_ref, vm_folder_ref, vm_create_spec,
+ vmdk_size):
+ """Initialize a writer for vmdk file.
+
+ :param session: a valid api session to ESX/VC server
+ :param host: the ESX or VC host IP
+ :param rp_ref: resource pool into which backing VM is imported
+ :param vm_folder_ref: VM folder in ESX/VC inventory to use as parent
+ of backing VM
+ :param vm_create_spec: backing VM created using this create spec
+ :param vmdk_size: VMDK size to be imported into backing VM
+ """
+ self._session = session
+ self._vmdk_size = vmdk_size
+ self._progress = 0
+ lease = session.invoke_api(session.vim, 'ImportVApp', rp_ref,
+ spec=vm_create_spec, folder=vm_folder_ref)
+ session.wait_for_lease_ready(lease)
+ self._lease = lease
+ lease_info = session.invoke_api(vim_util, 'get_object_property',
+ session.vim, lease, 'info')
+ # Find the url for vmdk device
+ url = self.find_vmdk_url(lease_info, host)
+ if not url:
+ msg = _("Could not retrieve URL from lease.")
+ LOG.exception(msg)
+ raise error_util.VimException(msg)
+ LOG.info(_("Opening vmdk url: %s for write.") % url)
+
+ # Prepare the http connection to the vmdk url
+ cookies = session.vim.client.options.transport.cookiejar
+ _urlparse = urlparse.urlparse(url)
+ scheme, netloc, path, params, query, fragment = _urlparse
+ if scheme == 'http':
+ conn = httplib.HTTPConnection(netloc)
+ elif scheme == 'https':
+ conn = httplib.HTTPSConnection(netloc)
+ if query:
+ path = path + '?' + query
+ conn.putrequest('PUT', path)
+ conn.putheader('User-Agent', USER_AGENT)
+ conn.putheader('Content-Length', str(vmdk_size))
+ conn.putheader('Overwrite', 't')
+ conn.putheader('Cookie', self._build_vim_cookie_headers(cookies))
+ conn.putheader('Content-Type', 'binary/octet-stream')
+ conn.endheaders()
+ self.conn = conn
+ VMwareHTTPFile.__init__(self, conn)
+
+ def write(self, data):
+ """Write to the file."""
+ self._progress += len(data)
+ LOG.debug(_("Written %s bytes to vmdk.") % self._progress)
+ self.file_handle.send(data)
+
+ def update_progress(self):
+ """Updates progress to lease.
+
+ This call back to the lease is essential to keep the lease alive
+ across long running write operations.
+ """
+ percent = int(float(self._progress) / self._vmdk_size * 100)
+ try:
+ LOG.debug(_("Updating progress to %s percent.") % percent)
+ self._session.invoke_api(self._session.vim,
+ 'HttpNfcLeaseProgress',
+ self._lease, percent=percent)
+ except error_util.VimException as ex:
+ LOG.exception(ex)
+ raise ex
+
+ def close(self):
+ """End the lease and close the connection."""
+ state = self._session.invoke_api(vim_util, 'get_object_property',
+ self._session.vim,
+ self._lease, 'state')
+ if state == 'ready':
+ self._session.invoke_api(self._session.vim, 'HttpNfcLeaseComplete',
+ self._lease)
+ LOG.debug(_("Lease released."))
+ else:
+ LOG.debug(_("Lease is already in state: %s.") % state)
+ super(VMwareHTTPWriteVmdk, self).close()
+
+
+class VMwareHTTPReadVmdk(VMwareHTTPFile):
+ """read VMDK over HTTP using VMware HttpNfcLease."""
+
+ def __init__(self, session, host, vm_ref, vmdk_path, vmdk_size):
+ """Initialize a writer for vmdk file.
+
+ During an export operation the vmdk disk is converted to a
+ stream-optimized sparse disk format. So the size of the VMDK
+ after export may be smaller than the current vmdk disk size.
+
+ :param session: a valid api session to ESX/VC server
+ :param host: the ESX or VC host IP
+ :param vm_ref: backing VM whose vmdk is to be exported
+ :param vmdk_path: datastore relative path to vmdk file to be exported
+ :param vmdk_size: current disk size of vmdk file to be exported
+ """
+ self._session = session
+ self._vmdk_size = vmdk_size
+ self._progress = 0
+ lease = session.invoke_api(session.vim, 'ExportVm', vm_ref)
+ session.wait_for_lease_ready(lease)
+ self._lease = lease
+ lease_info = session.invoke_api(vim_util, 'get_object_property',
+ session.vim, lease, 'info')
+
+ # find the right disk url corresponding to given vmdk_path
+ url = self.find_vmdk_url(lease_info, host)
+ if not url:
+ msg = _("Could not retrieve URL from lease.")
+ LOG.exception(msg)
+ raise error_util.VimException(msg)
+ LOG.info(_("Opening vmdk url: %s for read.") % url)
+
+ cookies = session.vim.client.options.transport.cookiejar
headers = {'User-Agent': USER_AGENT,
'Cookie': self._build_vim_cookie_headers(cookies)}
- request = urllib2.Request(base_url, None, headers)
+ request = urllib2.Request(url, None, headers)
conn = urllib2.urlopen(request)
VMwareHTTPFile.__init__(self, conn)
def read(self, chunk_size):
- """Read a chunk of data."""
- # We are ignoring the chunk size passed for we want the pipe to hold
- # data items of the chunk-size that Glance Client uses for read
- # while writing.
+ """Read a chunk from file"""
+ self._progress += READ_CHUNKSIZE
+ LOG.debug(_("Read %s bytes from vmdk.") % self._progress)
return self.file_handle.read(READ_CHUNKSIZE)
- def get_size(self):
- """Get size of the file to be read."""
- return self.file_handle.headers.get('Content-Length', -1)
+ def update_progress(self):
+ """Updates progress to lease.
+
+ This call back to the lease is essential to keep the lease alive
+ across long running read operations.
+ """
+ percent = int(float(self._progress) / self._vmdk_size * 100)
+ try:
+ LOG.debug(_("Updating progress to %s percent.") % percent)
+ self._session.invoke_api(self._session.vim,
+ 'HttpNfcLeaseProgress',
+ self._lease, percent=percent)
+ except error_util.VimException as ex:
+ LOG.exception(ex)
+ raise ex
+
+ def close(self):
+ """End the lease and close the connection."""
+ state = self._session.invoke_api(vim_util, 'get_object_property',
+ self._session.vim,
+ self._lease, 'state')
+ if state == 'ready':
+ self._session.invoke_api(self._session.vim, 'HttpNfcLeaseComplete',
+ self._lease)
+ LOG.debug(_("Lease released."))
+ else:
+ LOG.debug(_("Lease is already in state: %s.") % state)
+ super(VMwareHTTPReadVmdk, self).close()
def _relocate_backing(self, size_gb, backing, host):
pass
+ def _select_ds_for_volume(self, size_gb):
+ """Select datastore that can accommodate a volume of given size.
+
+ Returns the selected datastore summary along with a compute host and
+ its resource pool and folder where the volume can be created
+ :return: (host, rp, folder, summary)
+ """
+ retrv_result = self.volumeops.get_hosts()
+ while retrv_result:
+ hosts = retrv_result.objects
+ if not hosts:
+ break
+ (selected_host, rp, folder, summary) = (None, None, None, None)
+ for host in hosts:
+ host = host.obj
+ try:
+ (dss, rp) = self.volumeops.get_dss_rp(host)
+ (folder, summary) = self._get_folder_ds_summary(size_gb,
+ rp, dss)
+ selected_host = host
+ break
+ except error_util.VimException as excep:
+ LOG.warn(_("Unable to find suitable datastore for volume "
+ "of size: %(vol)s GB under host: %(host)s. "
+ "More details: %(excep)s") %
+ {'vol': size_gb,
+ 'host': host.obj, 'excep': excep})
+ if selected_host:
+ self.volumeops.cancel_retrieval(retrv_result)
+ return (selected_host, rp, folder, summary)
+ retrv_result = self.volumeops.continue_retrieval(retrv_result)
+
+ msg = _("Unable to find host to accommodate a disk of size: %s "
+ "in the inventory.") % size_gb
+ LOG.error(msg)
+ raise error_util.VimException(msg)
+
def _create_backing_in_inventory(self, volume):
"""Creates backing under any suitable host.
LOG.error(msg)
raise exception.ImageUnacceptable(msg)
- def copy_image_to_volume(self, context, volume, image_service, image_id):
- """Creates volume from image.
+ def _fetch_flat_image(self, context, volume, image_service, image_id,
+ image_size):
+ """Creates a volume from flat glance image.
Creates a backing for the volume under the ESX/VC server and
copies the VMDK flat file from the glance image content.
- The method supports only image with VMDK disk format.
-
- :param context: context
- :param volume: Volume object
- :param image_service: Glance image service
- :param image_id: Glance image id
+ The method assumes glance image is VMDK disk format and its
+ vmware_disktype is "sparse" or "preallocated", but not
+ "streamOptimized"
"""
- LOG.debug(_("Copy glance image: %s to create new volume.") % image_id)
-
- # Verify glance image is vmdk disk format
- metadata = image_service.show(context, image_id)
- disk_format = metadata['disk_format']
- VMwareEsxVmdkDriver._validate_disk_format(disk_format)
-
# Set volume size in GB from image metadata
- volume['size'] = float(metadata['size']) / units.GiB
+ volume['size'] = float(image_size) / units.GiB
# First create empty backing in the inventory
backing = self._create_backing_in_inventory(volume)
cookies = self.session.vim.client.options.transport.cookiejar
LOG.debug(_("Fetching glance image: %(id)s to server: %(host)s.") %
{'id': image_id, 'host': host_ip})
- vmware_images.fetch_image(context, timeout, image_service,
- image_id, host=host_ip,
- data_center_name=datacenter_name,
- datastore_name=datastore_name,
- cookies=cookies,
- file_path=flat_vmdk_path)
+ vmware_images.fetch_flat_image(context, timeout, image_service,
+ image_id, image_size=image_size,
+ host=host_ip,
+ data_center_name=datacenter_name,
+ datastore_name=datastore_name,
+ cookies=cookies,
+ file_path=flat_vmdk_path)
LOG.info(_("Done copying image: %(id)s to volume: %(vol)s.") %
{'id': image_id, 'vol': volume['name']})
except Exception as excep:
self.volumeops.delete_backing(backing)
raise excep
+ def _fetch_stream_optimized_image(self, context, volume, image_service,
+ image_id, image_size):
+ """Creates volume from image using HttpNfc VM import.
+
+ Uses Nfc API to download the VMDK file from Glance. Nfc creates the
+ backing VM that wraps the VMDK in the ESX/VC inventory.
+ This method assumes glance image is VMDK disk format and its
+ vmware_disktype is 'streamOptimized'.
+ """
+ try:
+ # find host in which to create the volume
+ size_gb = volume['size']
+ (host, rp, folder, summary) = self._select_ds_for_volume(size_gb)
+ except error_util.VimException as excep:
+ LOG.exception(_("Exception in _select_ds_for_volume: %s.") % excep)
+ raise excep
+
+ LOG.debug(_("Selected datastore %(ds)s for new volume of size "
+ "%(size)s GB.") % {'ds': summary.name, 'size': size_gb})
+
+ # prepare create spec for backing vm
+ disk_type = VMwareEsxVmdkDriver._get_disk_type(volume)
+
+ # The size of stream optimized glance image is often suspect,
+ # so better let VC figure out the disk capacity during import.
+ dummy_disk_size = 0
+ vm_create_spec = self.volumeops._get_create_spec(volume['name'],
+ dummy_disk_size,
+ disk_type,
+ summary.name)
+ # convert vm_create_spec to vm_import_spec
+ cf = self.session.vim.client.factory
+ vm_import_spec = cf.create('ns0:VirtualMachineImportSpec')
+ vm_import_spec.configSpec = vm_create_spec
+
+ try:
+ # fetching image from glance will also create the backing
+ timeout = self.configuration.vmware_image_transfer_timeout_secs
+ host_ip = self.configuration.vmware_host_ip
+ LOG.debug(_("Fetching glance image: %(id)s to server: %(host)s.") %
+ {'id': image_id, 'host': host_ip})
+ vmware_images.fetch_stream_optimized_image(context, timeout,
+ image_service,
+ image_id,
+ session=self.session,
+ host=host_ip,
+ resource_pool=rp,
+ vm_folder=folder,
+ vm_create_spec=
+ vm_import_spec,
+ image_size=image_size)
+ except exception.CinderException as excep:
+ LOG.exception(_("Exception in copy_image_to_volume: %s.") % excep)
+ backing = self.volumeops.get_backing(volume['name'])
+ if backing:
+ LOG.exception(_("Deleting the backing: %s") % backing)
+ # delete the backing
+ self.volumeops.delete_backing(backing)
+ raise excep
+
+ LOG.info(_("Done copying image: %(id)s to volume: %(vol)s.") %
+ {'id': image_id, 'vol': volume['name']})
+
+ def copy_image_to_volume(self, context, volume, image_service, image_id):
+ """Creates volume from image.
+
+ This method only supports Glance image of VMDK disk format.
+ Uses flat vmdk file copy for "sparse" and "preallocated" disk types
+ Uses HttpNfc import API for "streamOptimized" disk types. This API
+ creates a backing VM that wraps the VMDK in the ESX/VC inventory.
+
+ :param context: context
+ :param volume: Volume object
+ :param image_service: Glance image service
+ :param image_id: Glance image id
+ """
+ LOG.debug(_("Copy glance image: %s to create new volume.") % image_id)
+
+ # Verify glance image is vmdk disk format
+ metadata = image_service.show(context, image_id)
+ VMwareEsxVmdkDriver._validate_disk_format(metadata['disk_format'])
+
+ # Get disk_type for vmdk disk
+ disk_type = None
+ properties = metadata['properties']
+ if properties and 'vmware_disktype' in properties:
+ disk_type = properties['vmware_disktype']
+
+ if disk_type == 'streamOptimized':
+ self._fetch_stream_optimized_image(context, volume, image_service,
+ image_id, metadata['size'])
+ else:
+ self._fetch_flat_image(context, volume, image_service, image_id,
+ metadata['size'])
+
def copy_volume_to_image(self, context, volume, image_service, image_meta):
"""Creates glance image from volume.
- Upload of only available volume is supported.
+ Upload of only available volume is supported. The uploaded glance image
+ has a vmdk disk type of "streamOptimized" that can only be downloaded
+ using the HttpNfc API.
Steps followed are:
-
1. Get the name of the vmdk file which the volume points to right now.
Can be a chain of snapshots, so we need to know the last in the
chain.
- 2. Call CopyVirtualDisk which coalesces the disk chain to form a
- single vmdk, rather a .vmdk metadata file and a -flat.vmdk disk
- data file.
- 3. Now upload the -flat.vmdk file to the image store.
- 4. Delete the coalesced .vmdk and -flat.vmdk created.
+ 2. Use Nfc APIs to upload the contents of the vmdk file to glance.
"""
+ # if volume is attached raise exception
if volume['instance_uuid'] or volume['attached_host']:
msg = _("Upload to glance of attached volume is not supported.")
LOG.error(msg)
raise exception.InvalidVolume(msg)
+ # validate disk format is vmdk
LOG.debug(_("Copy Volume: %s to new image.") % volume['name'])
VMwareEsxVmdkDriver._validate_disk_format(image_meta['disk_format'])
+ # get backing vm of volume and its vmdk path
backing = self.volumeops.get_backing(volume['name'])
if not backing:
LOG.info(_("Backing not found, creating for volume: %s") %
volume['name'])
backing = self._create_backing_in_inventory(volume)
-
vmdk_file_path = self.volumeops.get_vmdk_path(backing)
- datastore_name = volumeops.split_datastore_path(vmdk_file_path)[0]
-
- # Create a copy of the vmdk into a tmp file
- image_id = image_meta['id']
- tmp_vmdk_file_path = '[%s] %s.vmdk' % (datastore_name, image_id)
- host = self.volumeops.get_host(backing)
- datacenter = self.volumeops.get_dc(host)
- self.volumeops.copy_vmdk_file(datacenter, vmdk_file_path,
- tmp_vmdk_file_path)
- try:
- # Upload image from copy of -flat.vmdk
- timeout = self.configuration.vmware_image_transfer_timeout_secs
- host_ip = self.configuration.vmware_host_ip
- datacenter_name = self.volumeops.get_entity_name(datacenter)
- cookies = self.session.vim.client.options.transport.cookiejar
- flat_vmdk_copy = '%s-flat.vmdk' % image_id
-
- vmware_images.upload_image(context, timeout, image_service,
- image_meta['id'],
- volume['project_id'], host=host_ip,
- data_center_name=datacenter_name,
- datastore_name=datastore_name,
- cookies=cookies,
- file_path=flat_vmdk_copy,
- snapshot_name=image_meta['name'],
- image_version=1)
- LOG.info(_("Done copying volume %(vol)s to a new image %(img)s") %
- {'vol': volume['name'], 'img': image_meta['name']})
- finally:
- # Delete the coalesced .vmdk and -flat.vmdk created
- self.volumeops.delete_vmdk_file(tmp_vmdk_file_path, datacenter)
+
+ # Upload image from vmdk
+ timeout = self.configuration.vmware_image_transfer_timeout_secs
+ host_ip = self.configuration.vmware_host_ip
+
+ vmware_images.upload_image(context, timeout, image_service,
+ image_meta['id'],
+ volume['project_id'],
+ session=self.session,
+ host=host_ip,
+ vm=backing,
+ vmdk_file_path=vmdk_file_path,
+ vmdk_size=volume['size'] * units.GiB,
+ image_name=image_meta['name'],
+ image_version=1)
+ LOG.info(_("Done copying volume %(vol)s to a new image %(img)s") %
+ {'vol': volume['name'], 'img': image_meta['name']})
class VMwareVcVmdkDriver(VMwareEsxVmdkDriver):
QUEUE_BUFFER_SIZE = 10
-def start_transfer(context, timeout_secs, read_file_handle, data_size,
+def start_transfer(context, timeout_secs, read_file_handle, max_data_size,
write_file_handle=None, image_service=None, image_id=None,
image_meta=None):
"""Start the data transfer from the reader to the writer.
# The pipe that acts as an intermediate store of data for reader to write
# to and writer to grab from.
- thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size)
+ thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, max_data_size)
# The read thread. In case of glance it is the instance of the
# GlanceFileRead class. The glance client read returns an iterator
# and this class wraps that iterator to provide datachunks in calls
write_file_handle.close()
-def fetch_image(context, timeout_secs, image_service, image_id, **kwargs):
- """Download image from the glance image server."""
- LOG.debug(_("Downloading image: %s from glance image server.") % image_id)
- metadata = image_service.show(context, image_id)
- file_size = int(metadata['size'])
+def fetch_flat_image(context, timeout_secs, image_service, image_id, **kwargs):
+ """Download flat image from the glance image server."""
+ LOG.debug(_("Downloading image: %s from glance image server as a flat vmdk"
+ " file.") % image_id)
+ file_size = int(kwargs.get('image_size'))
read_iter = image_service.download(context, image_id)
read_handle = rw_util.GlanceFileRead(read_iter)
write_handle = rw_util.VMwareHTTPWriteFile(kwargs.get('host'),
LOG.info(_("Downloaded image: %s from glance image server.") % image_id)
+def fetch_stream_optimized_image(context, timeout_secs, image_service,
+ image_id, **kwargs):
+ """Download stream optimized image from glance image server."""
+ LOG.debug(_("Downloading image: %s from glance image server using HttpNfc"
+ " import.") % image_id)
+ file_size = int(kwargs.get('image_size'))
+ read_iter = image_service.download(context, image_id)
+ read_handle = rw_util.GlanceFileRead(read_iter)
+ write_handle = rw_util.VMwareHTTPWriteVmdk(kwargs.get('session'),
+ kwargs.get('host'),
+ kwargs.get('resource_pool'),
+ kwargs.get('vm_folder'),
+ kwargs.get('vm_create_spec'),
+ file_size)
+ start_transfer(context, timeout_secs, read_handle, file_size,
+ write_file_handle=write_handle)
+ LOG.info(_("Downloaded image: %s from glance image server.") % image_id)
+
+
def upload_image(context, timeout_secs, image_service, image_id, owner_id,
**kwargs):
- """Upload the snapshot vm disk file to Glance image server."""
- LOG.debug(_("Uploading image: %s to the Glance image server.") % image_id)
- read_handle = rw_util.VMwareHTTPReadFile(kwargs.get('host'),
- kwargs.get('data_center_name'),
- kwargs.get('datastore_name'),
- kwargs.get('cookies'),
- kwargs.get('file_path'))
- file_size = read_handle.get_size()
+ """Upload the vm's disk file to Glance image server."""
+ LOG.debug(_("Uploading image: %s to the Glance image server using HttpNfc"
+ " export.") % image_id)
+ file_size = kwargs.get('vmdk_size')
+ read_handle = rw_util.VMwareHTTPReadVmdk(kwargs.get('session'),
+ kwargs.get('host'),
+ kwargs.get('vm'),
+ kwargs.get('vmdk_file_path'),
+ file_size)
+
# The properties and other fields that we need to set for the image.
+ # Important to set the 'size' to 0 here. Otherwise the glance client
+ # uses the volume size which may not be image size after upload since
+ # it is converted to a stream-optimized sparse disk
image_metadata = {'disk_format': 'vmdk',
'is_public': 'false',
- 'name': kwargs.get('snapshot_name'),
+ 'name': kwargs.get('image_name'),
'status': 'active',
'container_format': 'bare',
- 'size': file_size,
+ 'size': 0,
'properties': {'vmware_image_version':
kwargs.get('image_version'),
+ 'vmware_disktype': 'streamOptimized',
'owner_id': owner_id}}
start_transfer(context, timeout_secs, read_handle, file_size,
image_service=image_service, image_id=image_id,
controller_spec.device = controller_device
disk_device = cf.create('ns0:VirtualDisk')
- disk_device.capacityInKB = int(size_kb)
+ # for very small disks allocate at least 1KB
+ disk_device.capacityInKB = max(1, int(size_kb))
disk_device.key = -101
disk_device.unitNumber = 0
disk_device.controllerKey = -100
disk_device_bkng.eagerlyScrub = True
elif disk_type == 'thin':
disk_device_bkng.thinProvisioned = True
- disk_device_bkng.fileName = '[%s]' % ds_name
+ disk_device_bkng.fileName = ''
disk_device_bkng.diskMode = 'persistent'
disk_device.backing = disk_device_bkng
disk_spec = cf.create('ns0:VirtualDeviceConfigSpec')