from cinder.volume import driver as parent_driver
from cinder.volume.drivers.xenapi import lib
from cinder.volume.drivers.xenapi import sm as driver
+from cinder.volume.drivers.xenapi import tools
+import contextlib
+import mock
import mox
+import StringIO
import unittest
drv.delete_snapshot(snapshot)
mock.VerifyAll()
- def test_copy_image_to_volume_success(self):
+ def test_copy_image_to_volume_xenserver_case(self):
+ mock, drv = self._setup_mock_driver(
+ 'server', 'serverpath', '/var/run/sr-mount')
+
+ mock.StubOutWithMock(drv, '_use_glance_plugin_to_copy_image_to_volume')
+ mock.StubOutWithMock(driver, 'is_xenserver_image')
+ context = MockContext('token')
+
+ driver.is_xenserver_image(
+ context, 'image_service', 'image_id').AndReturn(True)
+ drv._use_glance_plugin_to_copy_image_to_volume(
+ context, 'volume', 'image_service', 'image_id').AndReturn('result')
+ mock.ReplayAll()
+ result = drv.copy_image_to_volume(
+ context, "volume", "image_service", "image_id")
+ self.assertEquals('result', result)
+ mock.VerifyAll()
+
+ def test_copy_image_to_volume_non_xenserver_case(self):
+ mock, drv = self._setup_mock_driver(
+ 'server', 'serverpath', '/var/run/sr-mount')
+
+ mock.StubOutWithMock(drv, '_use_image_utils_to_pipe_bytes_to_volume')
+ mock.StubOutWithMock(driver, 'is_xenserver_image')
+ context = MockContext('token')
+
+ driver.is_xenserver_image(
+ context, 'image_service', 'image_id').AndReturn(False)
+ drv._use_image_utils_to_pipe_bytes_to_volume(
+ context, 'volume', 'image_service', 'image_id').AndReturn(True)
+ mock.ReplayAll()
+ drv.copy_image_to_volume(
+ context, "volume", "image_service", "image_id")
+ mock.VerifyAll()
+
+ def test_use_image_utils_to_pipe_bytes_to_volume(self):
+ mock, drv = self._setup_mock_driver(
+ 'server', 'serverpath', '/var/run/sr-mount')
+
+ volume = dict(provider_location='sr-uuid/vdi-uuid')
+ context = MockContext('token')
+
+ mock.StubOutWithMock(driver.image_utils, 'fetch_to_raw')
+
+ @contextlib.contextmanager
+ def simple_context(value):
+ yield value
+
+ drv.nfs_ops.volume_attached_here(
+ 'server', 'serverpath', 'sr-uuid', 'vdi-uuid', False).AndReturn(
+ simple_context('device'))
+
+ driver.image_utils.fetch_to_raw(
+ context, 'image_service', 'image_id', 'device')
+
+ mock.ReplayAll()
+ drv._use_image_utils_to_pipe_bytes_to_volume(
+ context, volume, "image_service", "image_id")
+ mock.VerifyAll()
+
+ def test_use_glance_plugin_to_copy_image_to_volume_success(self):
mock, drv = self._setup_mock_driver(
'server', 'serverpath', '/var/run/sr-mount')
'server', 'serverpath', 'sr-uuid', 'vdi-uuid', 2)
mock.ReplayAll()
- drv.copy_image_to_volume(
+ drv._use_glance_plugin_to_copy_image_to_volume(
MockContext('token'), volume, "ignore", "image_id")
mock.VerifyAll()
- def test_copy_image_to_volume_fail(self):
+ def test_use_glance_plugin_to_copy_image_to_volume_fail(self):
mock, drv = self._setup_mock_driver(
'server', 'serverpath', '/var/run/sr-mount')
self.assertRaises(
exception.ImageCopyFailure,
- lambda: drv.copy_image_to_volume(
+ lambda: drv._use_glance_plugin_to_copy_image_to_volume(
MockContext('token'), volume, "ignore", "image_id"))
mock.VerifyAll()
stats = drv.get_volume_stats()
self.assertEquals('xensm', stats['storage_protocol'])
+
+
+class ToolsTest(unittest.TestCase):
+ @mock.patch('cinder.volume.drivers.xenapi.tools._stripped_first_line_of')
+ def test_get_this_vm_uuid(self, mock_read_first_line):
+ mock_read_first_line.return_value = 'someuuid'
+ self.assertEquals('someuuid', tools.get_this_vm_uuid())
+ mock_read_first_line.assert_called_once_with('/sys/hypervisor/uuid')
+
+ def test_stripped_first_line_of(self):
+ mock_context_manager = mock.Mock()
+ mock_context_manager.__enter__ = mock.Mock(
+ return_value=StringIO.StringIO(' blah \n second line \n'))
+ mock_context_manager.__exit__ = mock.Mock(return_value=False)
+ mock_open = mock.Mock(return_value=mock_context_manager)
+
+ with mock.patch('__builtin__.open', mock_open):
+ self.assertEquals(
+ 'blah', tools._stripped_first_line_of('/somefile'))
+
+ mock_open.assert_called_once_with('/somefile', 'rb')
# License for the specific language governing permissions and limitations
# under the License.
+from cinder.volume.drivers.xenapi import tools
import contextlib
import os
import pickle
return self.session.call_xenapi(method, *args)
+class VMOperations(OperationsBase):
+ def get_by_uuid(self, vm_uuid):
+ return self.call_xenapi('VM.get_by_uuid', vm_uuid)
+
+ def get_vbds(self, vm_uuid):
+ return self.call_xenapi('VM.get_VBDs', vm_uuid)
+
+
+class VBDOperations(OperationsBase):
+ def create(self, vm_ref, vdi_ref, userdevice, bootable, mode, type,
+ empty, other_config):
+ vbd_rec = dict(
+ VM=vm_ref,
+ VDI=vdi_ref,
+ userdevice=str(userdevice),
+ bootable=bootable,
+ mode=mode,
+ type=type,
+ empty=empty,
+ other_config=other_config,
+ qos_algorithm_type='',
+ qos_algorithm_params=dict()
+ )
+ return self.call_xenapi('VBD.create', vbd_rec)
+
+ def destroy(self, vbd_ref):
+ self.call_xenapi('VBD.destroy', vbd_ref)
+
+ def get_device(self, vbd_ref):
+ return self.call_xenapi('VBD.get_device', vbd_ref)
+
+ def plug(self, vbd_ref):
+ return self.call_xenapi('VBD.plug', vbd_ref)
+
+ def unplug(self, vbd_ref):
+ return self.call_xenapi('VBD.unplug', vbd_ref)
+
+ def get_vdi(self, vbd_ref):
+ return self.call_xenapi('VBD.get_VDI', vbd_ref)
+
+
+class PoolOperations(OperationsBase):
+ def get_all(self):
+ return self.call_xenapi('pool.get_all')
+
+ def get_default_SR(self, pool_ref):
+ return self.call_xenapi('pool.get_default_SR', pool_ref)
+
+
class PbdOperations(OperationsBase):
def get_all(self):
return self.call_xenapi('PBD.get_all')
self.SR = SrOperations(self)
self.VDI = VdiOperations(self)
self.host = HostOperations(self)
+ self.pool = PoolOperations(self)
+ self.VBD = VBDOperations(self)
+ self.VM = VMOperations(self)
def close(self):
return self.call_xenapi('logout')
os.path.join(sr_base_path, sr_uuid), auth_token, dict())
finally:
self.disconnect_volume(vdi_uuid)
+
+ @contextlib.contextmanager
+ def volume_attached_here(self, server, serverpath, sr_uuid, vdi_uuid,
+ readonly=True):
+ self.connect_volume(server, serverpath, sr_uuid, vdi_uuid)
+
+ with self._session_factory.get_session() as session:
+ vm_uuid = tools.get_this_vm_uuid()
+ vm_ref = session.VM.get_by_uuid(vm_uuid)
+ vdi_ref = session.VDI.get_by_uuid(vdi_uuid)
+ vbd_ref = session.VBD.create(
+ vm_ref, vdi_ref, userdevice='autodetect', bootable=False,
+ mode='RO' if readonly else 'RW', type='disk', empty=False,
+ other_config=dict())
+ session.VBD.plug(vbd_ref)
+ device = session.VBD.get_device(vbd_ref)
+ try:
+ yield "/dev/" + device
+ finally:
+ session.VBD.unplug(vbd_ref)
+ session.VBD.destroy(vbd_ref)
+ self.disconnect_volume(vdi_uuid)
from cinder import exception
from cinder import flags
from cinder.image import glance
+from cinder.image import image_utils
from cinder.openstack.common import log as logging
from cinder.volume import driver
from cinder.volume.drivers.xenapi import lib as xenapi_lib
pass
def copy_image_to_volume(self, context, volume, image_service, image_id):
+ if is_xenserver_image(context, image_service, image_id):
+ return self._use_glance_plugin_to_copy_image_to_volume(
+ context, volume, image_service, image_id)
+
+ return self._use_image_utils_to_pipe_bytes_to_volume(
+ context, volume, image_service, image_id)
+
+ def _use_image_utils_to_pipe_bytes_to_volume(self, context, volume,
+ image_service, image_id):
+ sr_uuid, vdi_uuid = volume['provider_location'].split('/')
+ with self.nfs_ops.volume_attached_here(FLAGS.xenapi_nfs_server,
+ FLAGS.xenapi_nfs_serverpath,
+ sr_uuid, vdi_uuid,
+ False) as device:
+ image_utils.fetch_to_raw(context,
+ image_service,
+ image_id,
+ device)
+
+ def _use_glance_plugin_to_copy_image_to_volume(self, context, volume,
+ image_service, image_id):
sr_uuid, vdi_uuid = volume['provider_location'].split('/')
api_servers = glance.get_api_servers()
reserved_percentage=0)
return self._stats
+
+
+def is_xenserver_image(context, image_service, image_id):
+ image_meta = image_service.show(context, image_id)
+
+ return (
+ image_meta['disk_format'] == 'vhd'
+ and image_meta['container_format'] == 'ovf'
+ )
--- /dev/null
+def _stripped_first_line_of(filename):
+ with open(filename, 'rb') as f:
+ return f.readline().strip()
+
+
+def get_this_vm_uuid():
+ return _stripped_first_line_of('/sys/hypervisor/uuid')
distribute>=0.6.28
coverage
+mock
mox>=0.5.3
nose
nosexcover