]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Using HttpNfcLease to transfer vmdk files.
authorSubramanian Neelakantan <subramanian.neelakantan@gmail.com>
Fri, 11 Oct 2013 03:24:18 +0000 (08:54 +0530)
committerSubramanian Neelakantan <subramanian.neelakantan@gmail.com>
Thu, 14 Nov 2013 06:54:29 +0000 (12:24 +0530)
The current VMware driver supported only "sparse" and "preallocated"
vmware_disktype property set in a "vmdk" glance image. Both of these were just
copied over as *-flat.vmdk files into the vmfs or nfs file system of the
underlying datastore. This was used during copy_image_to_volume() api.
Unfortunately for a vsan datastore this work flow breaks since there is no
access to the flat vmdk file in the underlying datastore.

This patch introduces a new vmware_disktype for a glance image called
"streamOptimized". This is a format generated when a VM/vApp is exported using
the HttpNfc APIs. AS the name suggests this is a highly optimized format for
streaming in chunks and thus would result in much faster upload / download
speeds. The driver's copy_volume_to_image() implementation now always uploads
the vmdk contents using HttpNfc api so that the glance image ends up in the
"streamOptimized" disk type. Also the driver's copy_image_to_volume()
implementation now understands a "streamOptmized" disk type and uses HttpNfc to
import that vmdk into a backing VM.

Note that the same "streamOptmized" glance image format will also be supported
by VMware nova driver. This change is in a different patch -
https://review.openstack.org/#/c/53976/

Patch Set 4: Removing changes to requirements.txt that got in by mistake.
Patch Set 5: Fixing a small bug around progress updates.
Patch Set 6: Addressing comments from Avishay.

Fixes bug: 1229998

Change-Id: I6b55945cb61efded826e0bcf7e2a678ebbbbd9d3

cinder/tests/test_vmware_vmdk.py
cinder/volume/drivers/vmware/api.py
cinder/volume/drivers/vmware/io_util.py
cinder/volume/drivers/vmware/read_write_util.py
cinder/volume/drivers/vmware/vmdk.py
cinder/volume/drivers/vmware/vmware_images.py
cinder/volume/drivers/vmware/volumeops.py

index 5846de2091888dbf56b502074790682ecf889c78..7a00eb8a613bece778ddd985229faf58f6b2074c 100644 (file)
@@ -1321,6 +1321,7 @@ class VMwareEsxVmdkDriverTestCase(test.TestCase):
         image_meta = FakeObject()
         image_meta['disk_format'] = 'vmdk'
         image_meta['size'] = 1 * units.MiB
+        image_meta['properties'] = {'vmware_disktype': 'preallocated'}
         image_service = m.CreateMock(glance.GlanceImageService)
         image_service.show(mox.IgnoreArg(), image_id).AndReturn(image_meta)
         volume = FakeObject()
@@ -1354,14 +1355,87 @@ class VMwareEsxVmdkDriverTestCase(test.TestCase):
         client.options.transport.cookiejar = cookies
         m.StubOutWithMock(self._vim.__class__, 'client')
         self._vim.client = client
-        m.StubOutWithMock(vmware_images, 'fetch_image')
+        m.StubOutWithMock(vmware_images, 'fetch_flat_image')
         timeout = self._config.vmware_image_transfer_timeout_secs
-        vmware_images.fetch_image(mox.IgnoreArg(), timeout, image_service,
-                                  image_id, host=self.IP,
-                                  data_center_name=datacenter_name,
-                                  datastore_name=datastore_name,
-                                  cookies=cookies,
-                                  file_path=flat_vmdk_path)
+        vmware_images.fetch_flat_image(mox.IgnoreArg(), timeout, image_service,
+                                       image_id, image_size=image_meta['size'],
+                                       host=self.IP,
+                                       data_center_name=datacenter_name,
+                                       datastore_name=datastore_name,
+                                       cookies=cookies,
+                                       file_path=flat_vmdk_path)
+
+        m.ReplayAll()
+        self._driver.copy_image_to_volume(mox.IgnoreArg(), volume,
+                                          image_service, image_id)
+        m.UnsetStubs()
+        m.VerifyAll()
+
+    def test_copy_image_to_volume_stream_optimized(self):
+        """Test copy_image_to_volume.
+
+        Test with an acceptable vmdk disk format and streamOptimized disk type.
+        """
+        m = self.mox
+        m.StubOutWithMock(self._driver.__class__, 'session')
+        self._driver.session = self._session
+        m.StubOutWithMock(api.VMwareAPISession, 'vim')
+        self._session.vim = self._vim
+        m.StubOutWithMock(self._driver.__class__, 'volumeops')
+        self._driver.volumeops = self._volumeops
+
+        image_id = 'image-id'
+        size = 5 * units.GiB
+        size_kb = float(size) / units.KiB
+        size_gb = float(size) / units.GiB
+        # image_service.show call
+        image_meta = FakeObject()
+        image_meta['disk_format'] = 'vmdk'
+        image_meta['size'] = size
+        image_meta['properties'] = {'vmware_disktype': 'streamOptimized'}
+        image_service = m.CreateMock(glance.GlanceImageService)
+        image_service.show(mox.IgnoreArg(), image_id).AndReturn(image_meta)
+        # _select_ds_for_volume call
+        (host, rp, folder, summary) = (FakeObject(), FakeObject(),
+                                       FakeObject(), FakeObject())
+        summary.name = "datastore-1"
+        m.StubOutWithMock(self._driver, '_select_ds_for_volume')
+        self._driver._select_ds_for_volume(size_gb).AndReturn((host, rp,
+                                                               folder,
+                                                               summary))
+        # _get_disk_type call
+        vol_name = 'volume name'
+        volume = FakeObject()
+        volume['name'] = vol_name
+        volume['size'] = size_gb
+        volume['volume_type_id'] = None  # _get_disk_type will return 'thin'
+        disk_type = 'thin'
+        # _get_create_spec call
+        m.StubOutWithMock(self._volumeops, '_get_create_spec')
+        self._volumeops._get_create_spec(vol_name, 0, disk_type,
+                                         summary.name)
+
+        # vim.client.factory.create call
+        class FakeFactory(object):
+            def create(self, name):
+                return mox.MockAnything()
+
+        client = FakeObject()
+        client.factory = FakeFactory()
+        m.StubOutWithMock(self._vim.__class__, 'client')
+        self._vim.client = client
+        # fetch_stream_optimized_image call
+        timeout = self._config.vmware_image_transfer_timeout_secs
+        m.StubOutWithMock(vmware_images, 'fetch_stream_optimized_image')
+        vmware_images.fetch_stream_optimized_image(mox.IgnoreArg(), timeout,
+                                                   image_service, image_id,
+                                                   session=self._session,
+                                                   host=self.IP,
+                                                   resource_pool=rp,
+                                                   vm_folder=folder,
+                                                   vm_create_spec=
+                                                   mox.IgnoreArg(),
+                                                   image_size=size)
 
         m.ReplayAll()
         self._driver.copy_image_to_volume(mox.IgnoreArg(), volume,
@@ -1421,6 +1495,9 @@ class VMwareEsxVmdkDriverTestCase(test.TestCase):
         project_id = 'project-owner-id-123'
         volume = FakeObject()
         volume['name'] = vol_name
+        size_gb = 5
+        size = size_gb * units.GiB
+        volume['size'] = size_gb
         volume['project_id'] = project_id
         volume['instance_uuid'] = None
         volume['attached_host'] = None
@@ -1434,48 +1511,17 @@ class VMwareEsxVmdkDriverTestCase(test.TestCase):
         vmdk_file_path = '[%s] %s' % (datastore_name, file_path)
         m.StubOutWithMock(self._volumeops, 'get_vmdk_path')
         self._volumeops.get_vmdk_path(backing).AndReturn(vmdk_file_path)
-        tmp_vmdk = '[datastore1] %s.vmdk' % image_id
-        # volumeops.get_host
-        host = FakeMor('Host', 'my_host')
-        m.StubOutWithMock(self._volumeops, 'get_host')
-        self._volumeops.get_host(backing).AndReturn(host)
-        # volumeops.get_dc
-        datacenter_name = 'my_datacenter'
-        datacenter = FakeMor('Datacenter', datacenter_name)
-        m.StubOutWithMock(self._volumeops, 'get_dc')
-        self._volumeops.get_dc(host).AndReturn(datacenter)
-        # volumeops.copy_vmdk_file
-        m.StubOutWithMock(self._volumeops, 'copy_vmdk_file')
-        self._volumeops.copy_vmdk_file(datacenter, vmdk_file_path, tmp_vmdk)
-        # host_ip
-        host_ip = self.IP
-        # volumeops.get_entity_name
-        m.StubOutWithMock(self._volumeops, 'get_entity_name')
-        self._volumeops.get_entity_name(datacenter).AndReturn(datacenter_name)
-        # cookiejar
-        client = FakeObject()
-        client.options = FakeObject()
-        client.options.transport = FakeObject()
-        cookies = FakeObject()
-        client.options.transport.cookiejar = cookies
-        m.StubOutWithMock(self._vim.__class__, 'client')
-        self._vim.client = client
-        # flat_vmdk
-        flat_vmdk_file = '%s-flat.vmdk' % image_id
         # vmware_images.upload_image
         timeout = self._config.vmware_image_transfer_timeout_secs
+        host_ip = self.IP
         m.StubOutWithMock(vmware_images, 'upload_image')
         vmware_images.upload_image(mox.IgnoreArg(), timeout, image_service,
-                                   image_id, project_id, host=host_ip,
-                                   data_center_name=datacenter_name,
-                                   datastore_name=datastore_name,
-                                   cookies=cookies,
-                                   file_path=flat_vmdk_file,
-                                   snapshot_name=image_meta['name'],
+                                   image_id, project_id, session=self._session,
+                                   host=host_ip, vm=backing,
+                                   vmdk_file_path=vmdk_file_path,
+                                   vmdk_size=size,
+                                   image_name=image_id,
                                    image_version=1)
-        # volumeops.delete_vmdk_file
-        m.StubOutWithMock(self._volumeops, 'delete_vmdk_file')
-        self._volumeops.delete_vmdk_file(tmp_vmdk, datacenter)
 
         m.ReplayAll()
         self._driver.copy_volume_to_image(mox.IgnoreArg(), volume,
index 5f96ea84cacf2dcab93956c9592833c1b73c430b..7b300193ae1ddb126313cee319a564b0618fae60 100644 (file)
@@ -271,3 +271,38 @@ class VMwareAPISession(object):
             LOG.exception(_("Task: %(task)s failed with error: %(err)s.") %
                           {'task': task, 'err': excep})
             done.send_exception(excep)
+
+    def wait_for_lease_ready(self, lease):
+        done = event.Event()
+        loop = loopingcall.FixedIntervalLoopingCall(self._poll_lease,
+                                                    lease,
+                                                    done)
+        loop.start(self._task_poll_interval)
+        done.wait()
+        loop.stop()
+
+    def _poll_lease(self, lease, done):
+        try:
+            state = self.invoke_api(vim_util, 'get_object_property',
+                                    self.vim, lease, 'state')
+            if state == 'ready':
+                # done
+                LOG.debug(_("Lease is ready."))
+                done.send()
+                return
+            elif state == 'initializing':
+                LOG.debug(_("Lease initializing..."))
+                return
+            elif state == 'error':
+                error_msg = self.invoke_api(vim_util, 'get_object_property',
+                                            self.vim, lease, 'error')
+                LOG.exception(error_msg)
+                excep = error_util.VimFaultException([], error_msg)
+                done.send_exception(excep)
+            else:
+                # unknown state - complain
+                error_msg = _("Error: unknown lease state %s.") % state
+                raise error_util.VimFaultException([], error_msg)
+        except Exception as excep:
+            LOG.exception(excep)
+            done.send_exception(excep)
index 435e98d10d639e0b869ca92557fe397f68e8cf2c..61a47aa472167a99ae2af654817ab218e93f6260 100644 (file)
@@ -36,22 +36,26 @@ class ThreadSafePipe(queue.LightQueue):
     """The pipe to hold the data which the reader writes to and the writer
     reads from.
     """
-    def __init__(self, maxsize, transfer_size):
+    def __init__(self, maxsize, max_transfer_size):
         queue.LightQueue.__init__(self, maxsize)
-        self.transfer_size = transfer_size
+        self.max_transfer_size = max_transfer_size
         self.transferred = 0
 
     def read(self, chunk_size):
         """Read data from the pipe.
 
-        Chunksize if ignored for we have ensured that the data chunks written
+        Chunksize is ignored for we have ensured that the data chunks written
         to the pipe by readers is the same as the chunks asked for by Writer.
         """
-        if self.transferred < self.transfer_size:
+        if self.transferred < self.max_transfer_size:
             data_item = self.get()
             self.transferred += len(data_item)
+            LOG.debug(_("Read %(bytes)s out of %(max)s from ThreadSafePipe.") %
+                      {'bytes': self.transferred,
+                       'max': self.max_transfer_size})
             return data_item
         else:
+            LOG.debug(_("Completed transfer of size %s.") % self.transferred)
             return ""
 
     def write(self, data):
@@ -64,7 +68,7 @@ class ThreadSafePipe(queue.LightQueue):
 
     def tell(self):
         """Get size of the file to be read."""
-        return self.transfer_size
+        return self.max_transfer_size
 
     def close(self):
         """A place-holder to maintain consistency."""
@@ -76,13 +80,13 @@ class GlanceWriteThread(object):
     it is in correct ('active')state.
     """
 
-    def __init__(self, context, input, image_service, image_id,
+    def __init__(self, context, input_file, image_service, image_id,
                  image_meta=None):
         if not image_meta:
             image_meta = {}
 
         self.context = context
-        self.input = input
+        self.input_file = input_file
         self.image_service = image_service
         self.image_id = image_id
         self.image_meta = image_meta
@@ -97,10 +101,13 @@ class GlanceWriteThread(object):
             Function to do the image data transfer through an update
             and thereon checks if the state is 'active'.
             """
+            LOG.debug(_("Initiating image service update on image: %(image)s "
+                        "with meta: %(meta)s") % {'image': self.image_id,
+                                                  'meta': self.image_meta})
             self.image_service.update(self.context,
                                       self.image_id,
                                       self.image_meta,
-                                      data=self.input)
+                                      data=self.input_file)
             self._running = True
             while self._running:
                 try:
@@ -109,6 +116,8 @@ class GlanceWriteThread(object):
                     image_status = image_meta.get('status')
                     if image_status == 'active':
                         self.stop()
+                        LOG.debug(_("Glance image: %s is now active.") %
+                                  self.image_id)
                         self.done.send(True)
                     # If the state is killed, then raise an exception.
                     elif image_status == 'killed':
@@ -150,9 +159,9 @@ class IOThread(object):
     output file till the transfer is completely done.
     """
 
-    def __init__(self, input, output):
-        self.input = input
-        self.output = output
+    def __init__(self, input_file, output_file):
+        self.input_file = input_file
+        self.output_file = output_file
         self._running = False
         self.got_exception = False
 
@@ -160,15 +169,19 @@ class IOThread(object):
         self.done = event.Event()
 
         def _inner():
-            """Read data from the input and write the same to the output."""
+            """Read data from input and write the same to output."""
             self._running = True
             while self._running:
                 try:
-                    data = self.input.read(None)
+                    data = self.input_file.read(None)
                     if not data:
                         self.stop()
                         self.done.send(True)
-                    self.output.write(data)
+                    self.output_file.write(data)
+                    if hasattr(self.input_file, "update_progress"):
+                        self.input_file.update_progress()
+                    if hasattr(self.output_file, "update_progress"):
+                        self.output_file.update_progress()
                     greenthread.sleep(IO_THREAD_SLEEP_TIME)
                 except Exception as exc:
                     self.stop()
index 53939cf1763a105b9c8e4a140fc90c2a46cc0623..b174ce04c4470c46c3bfb0588b2f9612e37883b7 100644 (file)
@@ -28,6 +28,8 @@ import urllib2
 import urlparse
 
 from cinder.openstack.common import log as logging
+from cinder.volume.drivers.vmware import error_util
+from cinder.volume.drivers.vmware import vim_util
 
 LOG = logging.getLogger(__name__)
 USER_AGENT = 'OpenStack-ESX-Adapter'
@@ -63,20 +65,12 @@ class GlanceFileRead(object):
 
 
 class VMwareHTTPFile(object):
-    """Base class for HTTP file."""
+    """Base class for VMDK file access over HTTP."""
 
     def __init__(self, file_handle):
         self.eof = False
         self.file_handle = file_handle
 
-    def set_eof(self, eof):
-        """Set the end of file marker."""
-        self.eof = eof
-
-    def get_eof(self):
-        """Check if the end of file has been reached."""
-        return self.eof
-
     def close(self):
         """Close the file handle."""
         try:
@@ -121,6 +115,28 @@ class VMwareHTTPFile(object):
             return '%s://[%s]' % (scheme, host)
         return '%s://%s' % (scheme, host)
 
+    def _fix_esx_url(self, url, host):
+        """Fix netloc if it is a ESX host.
+
+        For a ESX host the netloc is set to '*' in the url returned in
+        HttpNfcLeaseInfo. The netloc is right IP when talking to a VC.
+        """
+        urlp = urlparse.urlparse(url)
+        if urlp.netloc == '*':
+            scheme, _, path, params, query, fragment = urlp
+            url = urlparse.urlunparse((scheme, host, path, params,
+                                       query, fragment))
+        return url
+
+    def find_vmdk_url(self, lease_info, host):
+        """Find the URL corresponding to a vmdk disk in lease info."""
+        url = None
+        for deviceUrl in lease_info.deviceUrl:
+            if deviceUrl.disk:
+                url = self._fix_esx_url(deviceUrl.url, host)
+                break
+        return url
+
 
 class VMwareHTTPWriteFile(VMwareHTTPFile):
     """VMware file write handler class."""
@@ -159,28 +175,165 @@ class VMwareHTTPWriteFile(VMwareHTTPFile):
         super(VMwareHTTPWriteFile, self).close()
 
 
-class VMwareHTTPReadFile(VMwareHTTPFile):
-    """VMware file read handler class."""
+class VMwareHTTPWriteVmdk(VMwareHTTPFile):
+    """Write VMDK over HTTP using VMware HttpNfcLease."""
 
-    def __init__(self, host, data_center_name, datastore_name, cookies,
-                 file_path, scheme='https'):
-        soap_url = self.get_soap_url(scheme, host)
-        base_url = '%s/folder/%s' % (soap_url, urllib.pathname2url(file_path))
-        param_list = {'dcPath': data_center_name, 'dsName': datastore_name}
-        base_url = base_url + '?' + urllib.urlencode(param_list)
+    def __init__(self, session, host, rp_ref, vm_folder_ref, vm_create_spec,
+                 vmdk_size):
+        """Initialize a writer for vmdk file.
+
+        :param session: a valid api session to ESX/VC server
+        :param host: the ESX or VC host IP
+        :param rp_ref: resource pool into which backing VM is imported
+        :param vm_folder_ref: VM folder in ESX/VC inventory to use as parent
+               of backing VM
+        :param vm_create_spec: backing VM created using this create spec
+        :param vmdk_size: VMDK size to be imported into backing VM
+        """
+        self._session = session
+        self._vmdk_size = vmdk_size
+        self._progress = 0
+        lease = session.invoke_api(session.vim, 'ImportVApp', rp_ref,
+                                   spec=vm_create_spec, folder=vm_folder_ref)
+        session.wait_for_lease_ready(lease)
+        self._lease = lease
+        lease_info = session.invoke_api(vim_util, 'get_object_property',
+                                        session.vim, lease, 'info')
+        # Find the url for vmdk device
+        url = self.find_vmdk_url(lease_info, host)
+        if not url:
+            msg = _("Could not retrieve URL from lease.")
+            LOG.exception(msg)
+            raise error_util.VimException(msg)
+        LOG.info(_("Opening vmdk url: %s for write.") % url)
+
+        # Prepare the http connection to the vmdk url
+        cookies = session.vim.client.options.transport.cookiejar
+        _urlparse = urlparse.urlparse(url)
+        scheme, netloc, path, params, query, fragment = _urlparse
+        if scheme == 'http':
+            conn = httplib.HTTPConnection(netloc)
+        elif scheme == 'https':
+            conn = httplib.HTTPSConnection(netloc)
+        if query:
+            path = path + '?' + query
+        conn.putrequest('PUT', path)
+        conn.putheader('User-Agent', USER_AGENT)
+        conn.putheader('Content-Length', str(vmdk_size))
+        conn.putheader('Overwrite', 't')
+        conn.putheader('Cookie', self._build_vim_cookie_headers(cookies))
+        conn.putheader('Content-Type', 'binary/octet-stream')
+        conn.endheaders()
+        self.conn = conn
+        VMwareHTTPFile.__init__(self, conn)
+
+    def write(self, data):
+        """Write to the file."""
+        self._progress += len(data)
+        LOG.debug(_("Written %s bytes to vmdk.") % self._progress)
+        self.file_handle.send(data)
+
+    def update_progress(self):
+        """Updates progress to lease.
+
+        This call back to the lease is essential to keep the lease alive
+        across long running write operations.
+        """
+        percent = int(float(self._progress) / self._vmdk_size * 100)
+        try:
+            LOG.debug(_("Updating progress to %s percent.") % percent)
+            self._session.invoke_api(self._session.vim,
+                                     'HttpNfcLeaseProgress',
+                                     self._lease, percent=percent)
+        except error_util.VimException as ex:
+            LOG.exception(ex)
+            raise ex
+
+    def close(self):
+        """End the lease and close the connection."""
+        state = self._session.invoke_api(vim_util, 'get_object_property',
+                                         self._session.vim,
+                                         self._lease, 'state')
+        if state == 'ready':
+            self._session.invoke_api(self._session.vim, 'HttpNfcLeaseComplete',
+                                     self._lease)
+            LOG.debug(_("Lease released."))
+        else:
+            LOG.debug(_("Lease is already in state: %s.") % state)
+        super(VMwareHTTPWriteVmdk, self).close()
+
+
+class VMwareHTTPReadVmdk(VMwareHTTPFile):
+    """read VMDK over HTTP using VMware HttpNfcLease."""
+
+    def __init__(self, session, host, vm_ref, vmdk_path, vmdk_size):
+        """Initialize a writer for vmdk file.
+
+        During an export operation the vmdk disk is converted to a
+        stream-optimized sparse disk format. So the size of the VMDK
+        after export may be smaller than the current vmdk disk size.
+
+        :param session: a valid api session to ESX/VC server
+        :param host: the ESX or VC host IP
+        :param vm_ref: backing VM whose vmdk is to be exported
+        :param vmdk_path: datastore relative path to vmdk file to be exported
+        :param vmdk_size: current disk size of vmdk file to be exported
+        """
+        self._session = session
+        self._vmdk_size = vmdk_size
+        self._progress = 0
+        lease = session.invoke_api(session.vim, 'ExportVm', vm_ref)
+        session.wait_for_lease_ready(lease)
+        self._lease = lease
+        lease_info = session.invoke_api(vim_util, 'get_object_property',
+                                        session.vim, lease, 'info')
+
+        # find the right disk url corresponding to given vmdk_path
+        url = self.find_vmdk_url(lease_info, host)
+        if not url:
+            msg = _("Could not retrieve URL from lease.")
+            LOG.exception(msg)
+            raise error_util.VimException(msg)
+        LOG.info(_("Opening vmdk url: %s for read.") % url)
+
+        cookies = session.vim.client.options.transport.cookiejar
         headers = {'User-Agent': USER_AGENT,
                    'Cookie': self._build_vim_cookie_headers(cookies)}
-        request = urllib2.Request(base_url, None, headers)
+        request = urllib2.Request(url, None, headers)
         conn = urllib2.urlopen(request)
         VMwareHTTPFile.__init__(self, conn)
 
     def read(self, chunk_size):
-        """Read a chunk of data."""
-        # We are ignoring the chunk size passed for we want the pipe to hold
-        # data items of the chunk-size that Glance Client uses for read
-        # while writing.
+        """Read a chunk from file"""
+        self._progress += READ_CHUNKSIZE
+        LOG.debug(_("Read %s bytes from vmdk.") % self._progress)
         return self.file_handle.read(READ_CHUNKSIZE)
 
-    def get_size(self):
-        """Get size of the file to be read."""
-        return self.file_handle.headers.get('Content-Length', -1)
+    def update_progress(self):
+        """Updates progress to lease.
+
+        This call back to the lease is essential to keep the lease alive
+        across long running read operations.
+        """
+        percent = int(float(self._progress) / self._vmdk_size * 100)
+        try:
+            LOG.debug(_("Updating progress to %s percent.") % percent)
+            self._session.invoke_api(self._session.vim,
+                                     'HttpNfcLeaseProgress',
+                                     self._lease, percent=percent)
+        except error_util.VimException as ex:
+            LOG.exception(ex)
+            raise ex
+
+    def close(self):
+        """End the lease and close the connection."""
+        state = self._session.invoke_api(vim_util, 'get_object_property',
+                                         self._session.vim,
+                                         self._lease, 'state')
+        if state == 'ready':
+            self._session.invoke_api(self._session.vim, 'HttpNfcLeaseComplete',
+                                     self._lease)
+            LOG.debug(_("Lease released."))
+        else:
+            LOG.debug(_("Lease is already in state: %s.") % state)
+        super(VMwareHTTPReadVmdk, self).close()
index cc09d2f411eb07bd3fa80ec75e8ecb272bfba16f..82b32f7bc504cec10e34b4b6395309b3a91d9160 100644 (file)
@@ -319,6 +319,43 @@ class VMwareEsxVmdkDriver(driver.VolumeDriver):
     def _relocate_backing(self, size_gb, backing, host):
         pass
 
+    def _select_ds_for_volume(self, size_gb):
+        """Select datastore that can accommodate a volume of given size.
+
+        Returns the selected datastore summary along with a compute host and
+        its resource pool and folder where the volume can be created
+        :return: (host, rp, folder, summary)
+        """
+        retrv_result = self.volumeops.get_hosts()
+        while retrv_result:
+            hosts = retrv_result.objects
+            if not hosts:
+                break
+            (selected_host, rp, folder, summary) = (None, None, None, None)
+            for host in hosts:
+                host = host.obj
+                try:
+                    (dss, rp) = self.volumeops.get_dss_rp(host)
+                    (folder, summary) = self._get_folder_ds_summary(size_gb,
+                                                                    rp, dss)
+                    selected_host = host
+                    break
+                except error_util.VimException as excep:
+                    LOG.warn(_("Unable to find suitable datastore for volume "
+                               "of size: %(vol)s GB under host: %(host)s. "
+                               "More details: %(excep)s") %
+                             {'vol': size_gb,
+                              'host': host.obj, 'excep': excep})
+            if selected_host:
+                self.volumeops.cancel_retrieval(retrv_result)
+                return (selected_host, rp, folder, summary)
+            retrv_result = self.volumeops.continue_retrieval(retrv_result)
+
+        msg = _("Unable to find host to accommodate a disk of size: %s "
+                "in the inventory.") % size_gb
+        LOG.error(msg)
+        raise error_util.VimException(msg)
+
     def _create_backing_in_inventory(self, volume):
         """Creates backing under any suitable host.
 
@@ -612,27 +649,18 @@ class VMwareEsxVmdkDriver(driver.VolumeDriver):
             LOG.error(msg)
             raise exception.ImageUnacceptable(msg)
 
-    def copy_image_to_volume(self, context, volume, image_service, image_id):
-        """Creates volume from image.
+    def _fetch_flat_image(self, context, volume, image_service, image_id,
+                          image_size):
+        """Creates a volume from flat glance image.
 
         Creates a backing for the volume under the ESX/VC server and
         copies the VMDK flat file from the glance image content.
-        The method supports only image with VMDK disk format.
-
-        :param context: context
-        :param volume: Volume object
-        :param image_service: Glance image service
-        :param image_id: Glance image id
+        The method assumes glance image is VMDK disk format and its
+        vmware_disktype is "sparse" or "preallocated", but not
+        "streamOptimized"
         """
-        LOG.debug(_("Copy glance image: %s to create new volume.") % image_id)
-
-        # Verify glance image is vmdk disk format
-        metadata = image_service.show(context, image_id)
-        disk_format = metadata['disk_format']
-        VMwareEsxVmdkDriver._validate_disk_format(disk_format)
-
         # Set volume size in GB from image metadata
-        volume['size'] = float(metadata['size']) / units.GiB
+        volume['size'] = float(image_size) / units.GiB
         # First create empty backing in the inventory
         backing = self._create_backing_in_inventory(volume)
 
@@ -653,12 +681,13 @@ class VMwareEsxVmdkDriver(driver.VolumeDriver):
             cookies = self.session.vim.client.options.transport.cookiejar
             LOG.debug(_("Fetching glance image: %(id)s to server: %(host)s.") %
                       {'id': image_id, 'host': host_ip})
-            vmware_images.fetch_image(context, timeout, image_service,
-                                      image_id, host=host_ip,
-                                      data_center_name=datacenter_name,
-                                      datastore_name=datastore_name,
-                                      cookies=cookies,
-                                      file_path=flat_vmdk_path)
+            vmware_images.fetch_flat_image(context, timeout, image_service,
+                                           image_id, image_size=image_size,
+                                           host=host_ip,
+                                           data_center_name=datacenter_name,
+                                           datastore_name=datastore_name,
+                                           cookies=cookies,
+                                           file_path=flat_vmdk_path)
             LOG.info(_("Done copying image: %(id)s to volume: %(vol)s.") %
                      {'id': image_id, 'vol': volume['name']})
         except Exception as excep:
@@ -669,68 +698,148 @@ class VMwareEsxVmdkDriver(driver.VolumeDriver):
             self.volumeops.delete_backing(backing)
             raise excep
 
+    def _fetch_stream_optimized_image(self, context, volume, image_service,
+                                      image_id, image_size):
+        """Creates volume from image using HttpNfc VM import.
+
+        Uses Nfc API to download the VMDK file from Glance. Nfc creates the
+        backing VM that wraps the VMDK in the ESX/VC inventory.
+        This method assumes glance image is VMDK disk format and its
+        vmware_disktype is 'streamOptimized'.
+        """
+        try:
+            # find host in which to create the volume
+            size_gb = volume['size']
+            (host, rp, folder, summary) = self._select_ds_for_volume(size_gb)
+        except error_util.VimException as excep:
+            LOG.exception(_("Exception in _select_ds_for_volume: %s.") % excep)
+            raise excep
+
+        LOG.debug(_("Selected datastore %(ds)s for new volume of size "
+                    "%(size)s GB.") % {'ds': summary.name, 'size': size_gb})
+
+        # prepare create spec for backing vm
+        disk_type = VMwareEsxVmdkDriver._get_disk_type(volume)
+
+        # The size of stream optimized glance image is often suspect,
+        # so better let VC figure out the disk capacity during import.
+        dummy_disk_size = 0
+        vm_create_spec = self.volumeops._get_create_spec(volume['name'],
+                                                         dummy_disk_size,
+                                                         disk_type,
+                                                         summary.name)
+        # convert vm_create_spec to vm_import_spec
+        cf = self.session.vim.client.factory
+        vm_import_spec = cf.create('ns0:VirtualMachineImportSpec')
+        vm_import_spec.configSpec = vm_create_spec
+
+        try:
+            # fetching image from glance will also create the backing
+            timeout = self.configuration.vmware_image_transfer_timeout_secs
+            host_ip = self.configuration.vmware_host_ip
+            LOG.debug(_("Fetching glance image: %(id)s to server: %(host)s.") %
+                      {'id': image_id, 'host': host_ip})
+            vmware_images.fetch_stream_optimized_image(context, timeout,
+                                                       image_service,
+                                                       image_id,
+                                                       session=self.session,
+                                                       host=host_ip,
+                                                       resource_pool=rp,
+                                                       vm_folder=folder,
+                                                       vm_create_spec=
+                                                       vm_import_spec,
+                                                       image_size=image_size)
+        except exception.CinderException as excep:
+            LOG.exception(_("Exception in copy_image_to_volume: %s.") % excep)
+            backing = self.volumeops.get_backing(volume['name'])
+            if backing:
+                LOG.exception(_("Deleting the backing: %s") % backing)
+                # delete the backing
+                self.volumeops.delete_backing(backing)
+            raise excep
+
+        LOG.info(_("Done copying image: %(id)s to volume: %(vol)s.") %
+                 {'id': image_id, 'vol': volume['name']})
+
+    def copy_image_to_volume(self, context, volume, image_service, image_id):
+        """Creates volume from image.
+
+        This method only supports Glance image of VMDK disk format.
+        Uses flat vmdk file copy for "sparse" and "preallocated" disk types
+        Uses HttpNfc import API for "streamOptimized" disk types. This API
+        creates a backing VM that wraps the VMDK in the ESX/VC inventory.
+
+        :param context: context
+        :param volume: Volume object
+        :param image_service: Glance image service
+        :param image_id: Glance image id
+        """
+        LOG.debug(_("Copy glance image: %s to create new volume.") % image_id)
+
+        # Verify glance image is vmdk disk format
+        metadata = image_service.show(context, image_id)
+        VMwareEsxVmdkDriver._validate_disk_format(metadata['disk_format'])
+
+        # Get disk_type for vmdk disk
+        disk_type = None
+        properties = metadata['properties']
+        if properties and 'vmware_disktype' in properties:
+            disk_type = properties['vmware_disktype']
+
+        if disk_type == 'streamOptimized':
+            self._fetch_stream_optimized_image(context, volume, image_service,
+                                               image_id, metadata['size'])
+        else:
+            self._fetch_flat_image(context, volume, image_service, image_id,
+                                   metadata['size'])
+
     def copy_volume_to_image(self, context, volume, image_service, image_meta):
         """Creates glance image from volume.
 
-        Upload of only available volume is supported.
+        Upload of only available volume is supported. The uploaded glance image
+        has a vmdk disk type of "streamOptimized" that can only be downloaded
+        using the HttpNfc API.
         Steps followed are:
-
         1. Get the name of the vmdk file which the volume points to right now.
            Can be a chain of snapshots, so we need to know the last in the
            chain.
-        2. Call CopyVirtualDisk which coalesces the disk chain to form a
-           single vmdk, rather a .vmdk metadata file and a -flat.vmdk disk
-           data file.
-        3. Now upload the -flat.vmdk file to the image store.
-        4. Delete the coalesced .vmdk and -flat.vmdk created.
+        2. Use Nfc APIs to upload the contents of the vmdk file to glance.
         """
 
+        # if volume is attached raise exception
         if volume['instance_uuid'] or volume['attached_host']:
             msg = _("Upload to glance of attached volume is not supported.")
             LOG.error(msg)
             raise exception.InvalidVolume(msg)
 
+        # validate disk format is vmdk
         LOG.debug(_("Copy Volume: %s to new image.") % volume['name'])
         VMwareEsxVmdkDriver._validate_disk_format(image_meta['disk_format'])
 
+        # get backing vm of volume and its vmdk path
         backing = self.volumeops.get_backing(volume['name'])
         if not backing:
             LOG.info(_("Backing not found, creating for volume: %s") %
                      volume['name'])
             backing = self._create_backing_in_inventory(volume)
-
         vmdk_file_path = self.volumeops.get_vmdk_path(backing)
-        datastore_name = volumeops.split_datastore_path(vmdk_file_path)[0]
-
-        # Create a copy of the vmdk into a tmp file
-        image_id = image_meta['id']
-        tmp_vmdk_file_path = '[%s] %s.vmdk' % (datastore_name, image_id)
-        host = self.volumeops.get_host(backing)
-        datacenter = self.volumeops.get_dc(host)
-        self.volumeops.copy_vmdk_file(datacenter, vmdk_file_path,
-                                      tmp_vmdk_file_path)
-        try:
-            # Upload image from copy of -flat.vmdk
-            timeout = self.configuration.vmware_image_transfer_timeout_secs
-            host_ip = self.configuration.vmware_host_ip
-            datacenter_name = self.volumeops.get_entity_name(datacenter)
-            cookies = self.session.vim.client.options.transport.cookiejar
-            flat_vmdk_copy = '%s-flat.vmdk' % image_id
-
-            vmware_images.upload_image(context, timeout, image_service,
-                                       image_meta['id'],
-                                       volume['project_id'], host=host_ip,
-                                       data_center_name=datacenter_name,
-                                       datastore_name=datastore_name,
-                                       cookies=cookies,
-                                       file_path=flat_vmdk_copy,
-                                       snapshot_name=image_meta['name'],
-                                       image_version=1)
-            LOG.info(_("Done copying volume %(vol)s to a new image %(img)s") %
-                     {'vol': volume['name'], 'img': image_meta['name']})
-        finally:
-            # Delete the coalesced .vmdk and -flat.vmdk created
-            self.volumeops.delete_vmdk_file(tmp_vmdk_file_path, datacenter)
+
+        # Upload image from vmdk
+        timeout = self.configuration.vmware_image_transfer_timeout_secs
+        host_ip = self.configuration.vmware_host_ip
+
+        vmware_images.upload_image(context, timeout, image_service,
+                                   image_meta['id'],
+                                   volume['project_id'],
+                                   session=self.session,
+                                   host=host_ip,
+                                   vm=backing,
+                                   vmdk_file_path=vmdk_file_path,
+                                   vmdk_size=volume['size'] * units.GiB,
+                                   image_name=image_meta['name'],
+                                   image_version=1)
+        LOG.info(_("Done copying volume %(vol)s to a new image %(img)s") %
+                 {'vol': volume['name'], 'img': image_meta['name']})
 
 
 class VMwareVcVmdkDriver(VMwareEsxVmdkDriver):
index 98fa5d6e9a6e7296f875b0230ad311ff253bd8f8..e4f4ddc4e17aa546e850cf57bc15b9aa1d7942aa 100644 (file)
@@ -30,7 +30,7 @@ LOG = logging.getLogger(__name__)
 QUEUE_BUFFER_SIZE = 10
 
 
-def start_transfer(context, timeout_secs, read_file_handle, data_size,
+def start_transfer(context, timeout_secs, read_file_handle, max_data_size,
                    write_file_handle=None, image_service=None, image_id=None,
                    image_meta=None):
     """Start the data transfer from the reader to the writer.
@@ -45,7 +45,7 @@ def start_transfer(context, timeout_secs, read_file_handle, data_size,
 
     # The pipe that acts as an intermediate store of data for reader to write
     # to and writer to grab from.
-    thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size)
+    thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, max_data_size)
     # The read thread. In case of glance it is the instance of the
     # GlanceFileRead class. The glance client read returns an iterator
     # and this class wraps that iterator to provide datachunks in calls
@@ -91,11 +91,11 @@ def start_transfer(context, timeout_secs, read_file_handle, data_size,
             write_file_handle.close()
 
 
-def fetch_image(context, timeout_secs, image_service, image_id, **kwargs):
-    """Download image from the glance image server."""
-    LOG.debug(_("Downloading image: %s from glance image server.") % image_id)
-    metadata = image_service.show(context, image_id)
-    file_size = int(metadata['size'])
+def fetch_flat_image(context, timeout_secs, image_service, image_id, **kwargs):
+    """Download flat image from the glance image server."""
+    LOG.debug(_("Downloading image: %s from glance image server as a flat vmdk"
+                " file.") % image_id)
+    file_size = int(kwargs.get('image_size'))
     read_iter = image_service.download(context, image_id)
     read_handle = rw_util.GlanceFileRead(read_iter)
     write_handle = rw_util.VMwareHTTPWriteFile(kwargs.get('host'),
@@ -109,25 +109,50 @@ def fetch_image(context, timeout_secs, image_service, image_id, **kwargs):
     LOG.info(_("Downloaded image: %s from glance image server.") % image_id)
 
 
+def fetch_stream_optimized_image(context, timeout_secs, image_service,
+                                 image_id, **kwargs):
+    """Download stream optimized image from glance image server."""
+    LOG.debug(_("Downloading image: %s from glance image server using HttpNfc"
+                " import.") % image_id)
+    file_size = int(kwargs.get('image_size'))
+    read_iter = image_service.download(context, image_id)
+    read_handle = rw_util.GlanceFileRead(read_iter)
+    write_handle = rw_util.VMwareHTTPWriteVmdk(kwargs.get('session'),
+                                               kwargs.get('host'),
+                                               kwargs.get('resource_pool'),
+                                               kwargs.get('vm_folder'),
+                                               kwargs.get('vm_create_spec'),
+                                               file_size)
+    start_transfer(context, timeout_secs, read_handle, file_size,
+                   write_file_handle=write_handle)
+    LOG.info(_("Downloaded image: %s from glance image server.") % image_id)
+
+
 def upload_image(context, timeout_secs, image_service, image_id, owner_id,
                  **kwargs):
-    """Upload the snapshot vm disk file to Glance image server."""
-    LOG.debug(_("Uploading image: %s to the Glance image server.") % image_id)
-    read_handle = rw_util.VMwareHTTPReadFile(kwargs.get('host'),
-                                             kwargs.get('data_center_name'),
-                                             kwargs.get('datastore_name'),
-                                             kwargs.get('cookies'),
-                                             kwargs.get('file_path'))
-    file_size = read_handle.get_size()
+    """Upload the vm's disk file to Glance image server."""
+    LOG.debug(_("Uploading image: %s to the Glance image server using HttpNfc"
+                " export.") % image_id)
+    file_size = kwargs.get('vmdk_size')
+    read_handle = rw_util.VMwareHTTPReadVmdk(kwargs.get('session'),
+                                             kwargs.get('host'),
+                                             kwargs.get('vm'),
+                                             kwargs.get('vmdk_file_path'),
+                                             file_size)
+
     # The properties and other fields that we need to set for the image.
+    # Important to set the 'size' to 0 here. Otherwise the glance client
+    # uses the volume size which may not be image size after upload since
+    # it is converted to a stream-optimized sparse disk
     image_metadata = {'disk_format': 'vmdk',
                       'is_public': 'false',
-                      'name': kwargs.get('snapshot_name'),
+                      'name': kwargs.get('image_name'),
                       'status': 'active',
                       'container_format': 'bare',
-                      'size': file_size,
+                      'size': 0,
                       'properties': {'vmware_image_version':
                                      kwargs.get('image_version'),
+                                     'vmware_disktype': 'streamOptimized',
                                      'owner_id': owner_id}}
     start_transfer(context, timeout_secs, read_handle, file_size,
                    image_service=image_service, image_id=image_id,
index 7dc96a6784340a3ecae047b7d332952cdd46c476..6fde60e90a252ae9e60ef60947b9be214e795427 100644 (file)
@@ -299,7 +299,8 @@ class VMwareVolumeOps(object):
         controller_spec.device = controller_device
 
         disk_device = cf.create('ns0:VirtualDisk')
-        disk_device.capacityInKB = int(size_kb)
+        # for very small disks allocate at least 1KB
+        disk_device.capacityInKB = max(1, int(size_kb))
         disk_device.key = -101
         disk_device.unitNumber = 0
         disk_device.controllerKey = -100
@@ -308,7 +309,7 @@ class VMwareVolumeOps(object):
             disk_device_bkng.eagerlyScrub = True
         elif disk_type == 'thin':
             disk_device_bkng.thinProvisioned = True
-        disk_device_bkng.fileName = '[%s]' % ds_name
+        disk_device_bkng.fileName = ''
         disk_device_bkng.diskMode = 'persistent'
         disk_device.backing = disk_device_bkng
         disk_spec = cf.create('ns0:VirtualDeviceConfigSpec')