ctxt.auth_token = auth_token
+def get_configured_driver(server='ignore_server', path='ignore_path'):
+ configuration = mox.MockObject(conf.Configuration)
+ configuration.xenapi_nfs_server = server
+ configuration.xenapi_nfs_serverpath = path
+ configuration.append_config_values(mox.IgnoreArg())
+ return driver.XenAPINFSDriver(configuration=configuration)
+
+
class DriverTestCase(unittest.TestCase):
def assert_flag(self, flagname):
def test_create_volume(self):
mock = mox.Mox()
- configuration = mox.MockObject(conf.Configuration)
- configuration.xenapi_nfs_server = 'server'
- configuration.xenapi_nfs_serverpath = 'path'
- configuration.append_config_values(mox.IgnoreArg())
-
ops = mock.CreateMock(lib.NFSBasedVolumeOperations)
- drv = driver.XenAPINFSDriver(configuration=configuration)
+ drv = get_configured_driver('server', 'path')
drv.nfs_ops = ops
volume_details = dict(
def test_delete_volume(self):
mock = mox.Mox()
- configuration = mox.MockObject(conf.Configuration)
- configuration.xenapi_nfs_server = 'server'
- configuration.xenapi_nfs_serverpath = 'path'
- configuration.append_config_values(mox.IgnoreArg())
-
ops = mock.CreateMock(lib.NFSBasedVolumeOperations)
- drv = driver.XenAPINFSDriver(configuration=configuration)
+ drv = get_configured_driver('server', 'path')
drv.nfs_ops = ops
ops.delete_volume('server', 'path', 'sr_uuid', 'vdi_uuid')
def test_initialize_connection(self):
mock = mox.Mox()
- configuration = mox.MockObject(conf.Configuration)
- configuration.xenapi_nfs_server = 'server'
- configuration.xenapi_nfs_serverpath = 'path'
- configuration.append_config_values(mox.IgnoreArg())
- drv = driver.XenAPINFSDriver(configuration=configuration)
+ drv = get_configured_driver('server', 'path')
mock.ReplayAll()
result = drv.initialize_connection(
def test_initialize_connection_null_values(self):
mock = mox.Mox()
- configuration = mox.MockObject(conf.Configuration)
- configuration.xenapi_nfs_server = 'server'
- configuration.xenapi_nfs_serverpath = 'path'
- configuration.append_config_values(mox.IgnoreArg())
- drv = driver.XenAPINFSDriver(configuration=configuration)
+ drv = get_configured_driver('server', 'path')
mock.ReplayAll()
result = drv.initialize_connection(
def _setup_mock_driver(self, server, serverpath, sr_base_path="_srbp"):
mock = mox.Mox()
- configuration = mox.MockObject(conf.Configuration)
- configuration.xenapi_nfs_server = server
- configuration.xenapi_nfs_serverpath = serverpath
- configuration.append_config_values(mox.IgnoreArg())
-
- drv = driver.XenAPINFSDriver(configuration=configuration)
+ drv = get_configured_driver(server, serverpath)
ops = mock.CreateMock(lib.NFSBasedVolumeOperations)
db = mock.CreateMock(db_api)
drv.nfs_ops = ops
MockContext('token'), volume, "ignore", "image_id"))
mock.VerifyAll()
+
+ def test_get_volume_stats_reports_required_keys(self):
+ drv = get_configured_driver()
+
+ stats = drv.get_volume_stats()
+
+ required_metrics = [
+ 'volume_backend_name', 'vendor_name', 'driver_version',
+ 'storage_protocol', 'total_capacity_gb', 'free_capacity_gb',
+ 'reserved_percentage'
+ ]
+
+ for metric in required_metrics:
+ self.assertTrue(metric in stats)
+
+ def test_get_volume_stats_reports_unknown_cap(self):
+ drv = get_configured_driver()
+
+ stats = drv.get_volume_stats()
+
+ self.assertEquals('unknown', stats['free_capacity_gb'])
+
+ def test_reported_driver_type(self):
+ drv = get_configured_driver()
+
+ stats = drv.get_volume_stats()
+
+ self.assertEquals('xensm', stats['storage_protocol'])