import datetime
import hashlib
import os
-import socket
-import tempfile
import uuid
import mock
import six
import cinder
-from cinder.brick.initiator import connector
-from cinder.brick.initiator import linuxfc
from cinder import exception
from cinder import ssh_utils
from cinder import test
CONF = cfg.CONF
-def _get_local_mock_open(fake_data='abcd efgh'):
- mock_context_manager = mock.Mock()
- mock_context_manager.__enter__ = mock.Mock(
- return_value=six.StringIO(fake_data))
- mock_context_manager.__exit__ = mock.Mock(return_value=False)
- return mock_context_manager
-
-
class ExecuteTestCase(test.TestCase):
- def test_retry_on_failure(self):
- fd, tmpfilename = tempfile.mkstemp()
- _, tmpfilename2 = tempfile.mkstemp()
- try:
- fp = os.fdopen(fd, 'w+')
- fp.write('''#!/bin/sh
-# If stdin fails to get passed during one of the runs, make a note.
-if ! grep -q foo
-then
- echo 'failure' > "$1"
-fi
-# If stdin has failed to get passed during this or a previous run, exit early.
-if grep failure "$1"
-then
- exit 1
-fi
-runs="$(cat $1)"
-if [ -z "$runs" ]
-then
- runs=0
-fi
-runs=$(($runs + 1))
-echo $runs > "$1"
-exit 1
-''')
- fp.close()
- os.chmod(tmpfilename, 0o755)
- self.assertRaises(putils.ProcessExecutionError,
- utils.execute,
- tmpfilename, tmpfilename2, attempts=10,
- process_input='foo',
- delay_on_retry=False)
- fp = open(tmpfilename2, 'r+')
- runs = fp.read()
- fp.close()
- self.assertNotEqual(runs.strip(), 'failure', 'stdin did not '
- 'always get passed '
- 'correctly')
- runs = int(runs.strip())
- self.assertEqual(runs, 10, 'Ran %d times instead of 10.' % (runs,))
- finally:
- os.unlink(tmpfilename)
- os.unlink(tmpfilename2)
-
- def test_unknown_kwargs_raises_error(self):
- self.assertRaises(putils.UnknownArgumentError,
- utils.execute,
- '/usr/bin/env', 'true',
- this_is_not_a_valid_kwarg=True)
-
- def test_check_exit_code_boolean(self):
- utils.execute('/usr/bin/env', 'false', check_exit_code=False)
- self.assertRaises(putils.ProcessExecutionError,
- utils.execute,
- '/usr/bin/env', 'false', check_exit_code=True)
-
- def test_no_retry_on_success(self):
- fd, tmpfilename = tempfile.mkstemp()
- _, tmpfilename2 = tempfile.mkstemp()
- try:
- fp = os.fdopen(fd, 'w+')
- fp.write('''#!/bin/sh
-# If we've already run, bail out.
-grep -q foo "$1" && exit 1
-# Mark that we've run before.
-echo foo > "$1"
-# Check that stdin gets passed correctly.
-grep foo
-''')
- fp.close()
- os.chmod(tmpfilename, 0o755)
- utils.execute(tmpfilename,
- tmpfilename2,
- process_input='foo',
- attempts=2)
- finally:
- os.unlink(tmpfilename)
- os.unlink(tmpfilename2)
+ @mock.patch('cinder.utils.processutils.execute')
+ def test_execute(self, mock_putils_exe):
+ output = utils.execute('a', 1, foo='bar')
+ self.assertEqual(mock_putils_exe.return_value, output)
+ mock_putils_exe.assert_called_once_with('a', 1, foo='bar')
+
+ @mock.patch('cinder.utils.get_root_helper')
+ @mock.patch('cinder.utils.processutils.execute')
+ def test_execute_root(self, mock_putils_exe, mock_get_helper):
+ output = utils.execute('a', 1, foo='bar', run_as_root=True)
+ self.assertEqual(mock_putils_exe.return_value, output)
+ mock_helper = mock_get_helper.return_value
+ mock_putils_exe.assert_called_once_with('a', 1, foo='bar',
+ run_as_root=True,
+ root_helper=mock_helper)
+
+ @mock.patch('cinder.utils.get_root_helper')
+ @mock.patch('cinder.utils.processutils.execute')
+ def test_execute_root_and_helper(self, mock_putils_exe, mock_get_helper):
+ mock_helper = mock.Mock()
+ output = utils.execute('a', 1, foo='bar', run_as_root=True,
+ root_helper=mock_helper)
+ self.assertEqual(mock_putils_exe.return_value, output)
+ self.assertFalse(mock_get_helper.called)
+ mock_putils_exe.assert_called_once_with('a', 1, foo='bar',
+ run_as_root=True,
+ root_helper=mock_helper)
class GetFromPathTestCase(test.TestCase):
obj,
quiet=False)
+ def test_is_int_like(self):
+ self.assertTrue(utils.is_int_like(1))
+ self.assertTrue(utils.is_int_like(-1))
+ self.assertTrue(utils.is_int_like(0b1))
+ self.assertTrue(utils.is_int_like(0o1))
+ self.assertTrue(utils.is_int_like(0x1))
+ self.assertTrue(utils.is_int_like('1'))
+ self.assertFalse(utils.is_int_like(1.0))
+ self.assertFalse(utils.is_int_like('abc'))
+
def test_check_exclusive_options(self):
utils.check_exclusive_options()
utils.check_exclusive_options(something=None,
hostname = "<}\x1fh\x10e\x08l\x02l\x05o\x12!{>"
self.assertEqual("hello", utils.sanitize_hostname(hostname))
+ def test_is_valid_boolstr(self):
+ self.assertTrue(utils.is_valid_boolstr(True))
+ self.assertTrue(utils.is_valid_boolstr('trUe'))
+ self.assertTrue(utils.is_valid_boolstr(False))
+ self.assertTrue(utils.is_valid_boolstr('faLse'))
+ self.assertTrue(utils.is_valid_boolstr('yeS'))
+ self.assertTrue(utils.is_valid_boolstr('nO'))
+ self.assertTrue(utils.is_valid_boolstr('y'))
+ self.assertTrue(utils.is_valid_boolstr('N'))
+ self.assertTrue(utils.is_valid_boolstr(1))
+ self.assertTrue(utils.is_valid_boolstr('1'))
+ self.assertTrue(utils.is_valid_boolstr(0))
+ self.assertTrue(utils.is_valid_boolstr('0'))
+
def test_generate_glance_url(self):
generated_url = utils.generate_glance_url()
actual_url = "http://%s:%d" % (CONF.glance_host,
CONF.glance_port)
self.assertEqual(generated_url, actual_url)
- def test_read_file_as_root(self):
- def fake_execute(*args, **kwargs):
- if args[1] == 'bad':
- raise putils.ProcessExecutionError
- return 'fakecontents', None
-
- self.stubs.Set(utils, 'execute', fake_execute)
- contents = utils.read_file_as_root('good')
- self.assertEqual(contents, 'fakecontents')
+ @mock.patch('os.path.join', side_effect=lambda x, y: '/'.join((x, y)))
+ def test_make_dev_path(self, mock_join):
+ self.assertEqual('/dev/xvda', utils.make_dev_path('xvda'))
+ self.assertEqual('/dev/xvdb1', utils.make_dev_path('xvdb', 1))
+ self.assertEqual('/foo/xvdc1', utils.make_dev_path('xvdc', 1, '/foo'))
+
+ @mock.patch('cinder.utils.execute')
+ def test_read_file_as_root(self, mock_exec):
+ out = mock.Mock()
+ err = mock.Mock()
+ mock_exec.return_value = (out, err)
+ test_filepath = '/some/random/path'
+ output = utils.read_file_as_root(test_filepath)
+ mock_exec.assert_called_once_with('cat', test_filepath,
+ run_as_root=True)
+ self.assertEqual(out, output)
+
+ @mock.patch('cinder.utils.execute',
+ side_effect=putils.ProcessExecutionError)
+ def test_read_file_as_root_fails(self, mock_exec):
+ test_filepath = '/some/random/path'
self.assertRaises(exception.FileNotFound,
- utils.read_file_as_root, 'bad')
-
- def test_temporary_chown(self):
- def fake_execute(*args, **kwargs):
- if args[0] == 'chown':
- fake_execute.uid = args[1]
- self.stubs.Set(utils, 'execute', fake_execute)
-
- with tempfile.NamedTemporaryFile() as f:
- with utils.temporary_chown(f.name, owner_uid=2):
- self.assertEqual(fake_execute.uid, 2)
- self.assertEqual(fake_execute.uid, os.getuid())
+ utils.read_file_as_root,
+ test_filepath)
@mock.patch('oslo.utils.timeutils.utcnow')
def test_service_is_up(self, mock_utcnow):
self.assertEqual(gid, 33333)
mock_stat.assert_called_once_with(test_file)
+ @mock.patch('cinder.utils.CONF')
+ def test_get_root_helper(self, mock_conf):
+ mock_conf.rootwrap_config = '/path/to/conf'
+ self.assertEqual('sudo cinder-rootwrap /path/to/conf',
+ utils.get_root_helper())
+
+
+class TemporaryChownTestCase(test.TestCase):
+ @mock.patch('os.stat')
+ @mock.patch('os.getuid', return_value=1234)
+ @mock.patch('cinder.utils.execute')
+ def test_get_uid(self, mock_exec, mock_getuid, mock_stat):
+ mock_stat.return_value.st_uid = 5678
+ test_filename = 'a_file'
+ with utils.temporary_chown(test_filename):
+ mock_exec.assert_called_once_with('chown', 1234, test_filename,
+ run_as_root=True)
+ mock_getuid.asset_called_once_with()
+ mock_stat.assert_called_once_with(test_filename)
+ calls = [mock.call('chown', 1234, test_filename, run_as_root=True),
+ mock.call('chown', 5678, test_filename, run_as_root=True)]
+ mock_exec.assert_has_calls(calls)
+
+ @mock.patch('os.stat')
+ @mock.patch('os.getuid', return_value=1234)
+ @mock.patch('cinder.utils.execute')
+ def test_supplied_owner_uid(self, mock_exec, mock_getuid, mock_stat):
+ mock_stat.return_value.st_uid = 5678
+ test_filename = 'a_file'
+ with utils.temporary_chown(test_filename, owner_uid=9101):
+ mock_exec.assert_called_once_with('chown', 9101, test_filename,
+ run_as_root=True)
+ self.assertFalse(mock_getuid.called)
+ mock_stat.assert_called_once_with(test_filename)
+ calls = [mock.call('chown', 9101, test_filename, run_as_root=True),
+ mock.call('chown', 5678, test_filename, run_as_root=True)]
+ mock_exec.assert_has_calls(calls)
+
+ @mock.patch('os.stat')
+ @mock.patch('os.getuid', return_value=5678)
+ @mock.patch('cinder.utils.execute')
+ def test_matching_uid(self, mock_exec, mock_getuid, mock_stat):
+ mock_stat.return_value.st_uid = 5678
+ test_filename = 'a_file'
+ with utils.temporary_chown(test_filename):
+ pass
+ mock_getuid.asset_called_once_with()
+ mock_stat.assert_called_once_with(test_filename)
+ self.assertFalse(mock_exec.called)
+
+
+class TempdirTestCase(test.TestCase):
+ @mock.patch('tempfile.mkdtemp')
+ @mock.patch('shutil.rmtree')
+ def test_tempdir(self, mock_rmtree, mock_mkdtemp):
+ with utils.tempdir(a='1', b=2) as td:
+ self.assertEqual(mock_mkdtemp.return_value, td)
+ self.assertFalse(mock_rmtree.called)
+ mock_mkdtemp.assert_called_once_with(a='1', b=2)
+ mock_rmtree.assert_called_once_with(mock_mkdtemp.return_value)
+
+ @mock.patch('tempfile.mkdtemp')
+ @mock.patch('shutil.rmtree', side_effect=OSError)
+ def test_tempdir_error(self, mock_rmtree, mock_mkdtemp):
+ with utils.tempdir(a='1', b=2) as td:
+ self.assertEqual(mock_mkdtemp.return_value, td)
+ self.assertFalse(mock_rmtree.called)
+ mock_mkdtemp.assert_called_once_with(a='1', b=2)
+ mock_rmtree.assert_called_once_with(mock_mkdtemp.return_value)
+
+
+class WalkClassHierarchyTestCase(test.TestCase):
+ def test_walk_class_hierarchy(self):
+ class A(object):
+ pass
+
+ class B(A):
+ pass
+
+ class C(A):
+ pass
+
+ class D(B):
+ pass
+
+ class E(A):
+ pass
+
+ class_pairs = zip((D, B, E),
+ utils.walk_class_hierarchy(A, encountered=[C]))
+ for actual, expected in class_pairs:
+ self.assertEqual(actual, expected)
+
+ class_pairs = zip((D, B, C, E), utils.walk_class_hierarchy(A))
+ for actual, expected in class_pairs:
+ self.assertEqual(actual, expected)
+
+
+class GetDiskOfPartitionTestCase(test.TestCase):
+ def test_devpath_is_diskpath(self):
+ devpath = '/some/path'
+ st_mock = mock.Mock()
+ output = utils._get_disk_of_partition(devpath, st_mock)
+ self.assertEqual('/some/path', output[0])
+ self.assertIs(st_mock, output[1])
+
+ with mock.patch('os.stat') as mock_stat:
+ devpath = '/some/path'
+ output = utils._get_disk_of_partition(devpath)
+ mock_stat.assert_called_once_with(devpath)
+ self.assertEqual(devpath, output[0])
+ self.assertIs(mock_stat.return_value, output[1])
+
+ @mock.patch('os.stat', side_effect=OSError)
+ def test_stat_oserror(self, mock_stat):
+ st_mock = mock.Mock()
+ devpath = '/some/path1'
+ output = utils._get_disk_of_partition(devpath, st_mock)
+ mock_stat.assert_called_once_with('/some/path')
+ self.assertEqual(devpath, output[0])
+ self.assertIs(st_mock, output[1])
+
+ @mock.patch('stat.S_ISBLK', return_value=True)
+ @mock.patch('os.stat')
+ def test_diskpath_is_block_device(self, mock_stat, mock_isblk):
+ st_mock = mock.Mock()
+ devpath = '/some/path1'
+ output = utils._get_disk_of_partition(devpath, st_mock)
+ self.assertEqual('/some/path', output[0])
+ self.assertEqual(mock_stat.return_value, output[1])
+
+ @mock.patch('stat.S_ISBLK', return_value=False)
+ @mock.patch('os.stat')
+ def test_diskpath_is_not_block_device(self, mock_stat, mock_isblk):
+ st_mock = mock.Mock()
+ devpath = '/some/path1'
+ output = utils._get_disk_of_partition(devpath, st_mock)
+ self.assertEqual(devpath, output[0])
+ self.assertEqual(st_mock, output[1])
+
+
+class GetBlkdevMajorMinorTestCase(test.TestCase):
@mock.patch('os.stat')
def test_get_blkdev_major_minor(self, mock_stat):
dev = self._test_get_blkdev_major_minor_file('nfs-server:/export/path')
self.assertIsNone(dev)
-
-class GetDiskOfPartitionTestCase(test.TestCase):
- def test_devpath_is_diskpath(self):
- devpath = '/some/path'
- st_mock = mock.Mock()
- output = utils._get_disk_of_partition(devpath, st_mock)
- self.assertEqual('/some/path', output[0])
- self.assertIs(st_mock, output[1])
-
- with mock.patch('os.stat') as mock_stat:
- devpath = '/some/path'
- output = utils._get_disk_of_partition(devpath)
- mock_stat.assert_called_once_with(devpath)
- self.assertEqual(devpath, output[0])
- self.assertIs(mock_stat.return_value, output[1])
-
- @mock.patch('os.stat', side_effect=OSError)
- def test_stat_oserror(self, mock_stat):
- st_mock = mock.Mock()
- devpath = '/some/path1'
- output = utils._get_disk_of_partition(devpath, st_mock)
- mock_stat.assert_called_once_with('/some/path')
- self.assertEqual(devpath, output[0])
- self.assertIs(st_mock, output[1])
-
- @mock.patch('stat.S_ISBLK', return_value=True)
@mock.patch('os.stat')
- def test_diskpath_is_block_device(self, mock_stat, mock_isblk):
- st_mock = mock.Mock()
- devpath = '/some/path1'
- output = utils._get_disk_of_partition(devpath, st_mock)
- self.assertEqual('/some/path', output[0])
- self.assertEqual(mock_stat.return_value, output[1])
-
+ @mock.patch('stat.S_ISCHR', return_value=False)
@mock.patch('stat.S_ISBLK', return_value=False)
+ def test_get_blkdev_failure(self, mock_isblk, mock_ischr, mock_stat):
+ path = '/some/path'
+ self.assertRaises(exception.Error,
+ utils.get_blkdev_major_minor,
+ path, lookup_for_file=False)
+ mock_stat.assert_called_once_with(path)
+ mock_isblk.assert_called_once_with(mock_stat.return_value.st_mode)
+ mock_ischr.assert_called_once_with(mock_stat.return_value.st_mode)
+
@mock.patch('os.stat')
- def test_diskpath_is_not_block_device(self, mock_stat, mock_isblk):
- st_mock = mock.Mock()
- devpath = '/some/path1'
- output = utils._get_disk_of_partition(devpath, st_mock)
- self.assertEqual(devpath, output[0])
- self.assertEqual(st_mock, output[1])
+ @mock.patch('stat.S_ISCHR', return_value=True)
+ @mock.patch('stat.S_ISBLK', return_value=False)
+ def test_get_blkdev_is_chr(self, mock_isblk, mock_ischr, mock_stat):
+ path = '/some/path'
+ output = utils.get_blkdev_major_minor(path, lookup_for_file=False)
+ mock_stat.assert_called_once_with(path)
+ mock_isblk.assert_called_once_with(mock_stat.return_value.st_mode)
+ mock_ischr.assert_called_once_with(mock_stat.return_value.st_mode)
+ self.assertIs(None, output)
class MonkeyPatchTestCase(test.TestCase):
month=2,
year=2012))
+ @mock.patch('oslo.utils.timeutils.utcnow',
+ return_value=datetime.datetime(day=1,
+ month=1,
+ year=2012))
+ def test_month_jan_day_first(self, mock_utcnow):
+ begin, end = utils.last_completed_audit_period(unit='month')
+ self.assertEqual(datetime.datetime(day=1, month=11, year=2011), begin)
+ self.assertEqual(datetime.datetime(day=1, month=12, year=2011), end)
+
+ @mock.patch('oslo.utils.timeutils.utcnow',
+ return_value=datetime.datetime(day=2,
+ month=1,
+ year=2012))
+ def test_month_jan_day_not_first(self, mock_utcnow):
+ begin, end = utils.last_completed_audit_period(unit='month')
+ self.assertEqual(datetime.datetime(day=1, month=12, year=2011), begin)
+ self.assertEqual(datetime.datetime(day=1, month=1, year=2012), end)
+
def test_year(self):
begin, end = utils.last_completed_audit_period(unit='year')
self.assertEqual(begin, datetime.datetime(day=1,
month=6,
year=2011))
+ def test_invalid_unit(self):
+ self.assertRaises(ValueError,
+ utils.last_completed_audit_period,
+ unit='invalid_unit')
+
+ @mock.patch('cinder.utils.CONF')
+ def test_uses_conf_unit(self, mock_conf):
+ mock_conf.volume_usage_audit_period = 'hour'
+ begin1, end1 = utils.last_completed_audit_period()
+ self.assertEqual(60.0 * 60, (end1 - begin1).total_seconds())
+
+ mock_conf.volume_usage_audit_period = 'day'
+ begin2, end2 = utils.last_completed_audit_period()
+
+ self.assertEqual(60.0 * 60 * 24, (end2 - begin2).total_seconds())
+
class FakeSSHClient(object):
wrapper functions.
"""
- def test_brick_get_connector_properties(self):
-
- self.mox.StubOutWithMock(socket, 'gethostname')
- socket.gethostname().AndReturn('fakehost')
-
- self.mox.StubOutWithMock(connector.ISCSIConnector, 'get_initiator')
- connector.ISCSIConnector.get_initiator().AndReturn('fakeinitiator')
-
- self.mox.StubOutWithMock(linuxfc.LinuxFibreChannel, 'get_fc_wwpns')
- linuxfc.LinuxFibreChannel.get_fc_wwpns().AndReturn(None)
-
- self.mox.StubOutWithMock(linuxfc.LinuxFibreChannel, 'get_fc_wwnns')
- linuxfc.LinuxFibreChannel.get_fc_wwnns().AndReturn(None)
-
- props = {'initiator': 'fakeinitiator',
- 'host': 'fakehost',
- 'ip': CONF.my_ip,
- }
-
- self.mox.ReplayAll()
- props_actual = utils.brick_get_connector_properties()
- self.assertEqual(props, props_actual)
- self.mox.VerifyAll()
-
- def test_brick_get_connector(self):
-
- root_helper = utils.get_root_helper()
-
- self.mox.StubOutClassWithMocks(connector, 'ISCSIConnector')
- connector.ISCSIConnector(execute=putils.execute,
- driver=None,
- root_helper=root_helper,
- use_multipath=False,
- device_scan_attempts=3)
-
- self.mox.StubOutClassWithMocks(connector, 'FibreChannelConnector')
- connector.FibreChannelConnector(execute=putils.execute,
- driver=None,
- root_helper=root_helper,
- use_multipath=False,
- device_scan_attempts=3)
-
- self.mox.StubOutClassWithMocks(connector, 'AoEConnector')
- connector.AoEConnector(execute=putils.execute,
- driver=None,
- root_helper=root_helper,
- device_scan_attempts=3)
-
- self.mox.StubOutClassWithMocks(connector, 'LocalConnector')
- connector.LocalConnector(execute=putils.execute,
- driver=None,
- root_helper=root_helper,
- device_scan_attempts=3)
-
- self.mox.ReplayAll()
- utils.brick_get_connector('iscsi')
- utils.brick_get_connector('fibre_channel')
- utils.brick_get_connector('aoe')
- utils.brick_get_connector('local')
- self.mox.VerifyAll()
+ @mock.patch('cinder.utils.CONF')
+ @mock.patch('cinder.brick.initiator.connector.get_connector_properties')
+ @mock.patch('cinder.utils.get_root_helper')
+ def test_brick_get_connector_properties(self, mock_helper, mock_get,
+ mock_conf):
+ mock_conf.my_ip = '1.2.3.4'
+ output = utils.brick_get_connector_properties()
+ mock_helper.assert_called_once_with()
+ mock_get.assert_called_once_with(mock_helper.return_value, '1.2.3.4')
+ self.assertEqual(mock_get.return_value, output)
+
+ @mock.patch('cinder.brick.initiator.connector.InitiatorConnector.factory')
+ @mock.patch('cinder.utils.get_root_helper')
+ def test_brick_get_connector(self, mock_helper, mock_factory):
+ output = utils.brick_get_connector('protocol')
+ mock_helper.assert_called_once_with()
+ self.assertEqual(mock_factory.return_value, output)
+ mock_factory.assert_called_once_with(
+ 'protocol', mock_helper.return_value, driver=None,
+ execute=putils.execute, use_multipath=False,
+ device_scan_attempts=3)
class StringLengthTestCase(test.TestCase):
self.assertRaises(exception.InvalidInput,
utils.check_string_length,
'a' * 256, 'name', max_length=255)
+
+
+class AddVisibleAdminMetadataTestCase(test.TestCase):
+ def test_add_visible_admin_metadata_visible_key_only(self):
+ admin_metadata = [{"key": "invisible_key", "value": "invisible_value"},
+ {"key": "readonly", "value": "visible"},
+ {"key": "attached_mode", "value": "visible"}]
+ metadata = [{"key": "key", "value": "value"},
+ {"key": "readonly", "value": "existing"}]
+ volume = {'volume_admin_metadata': admin_metadata,
+ 'volume_metadata': metadata}
+ utils.add_visible_admin_metadata(volume)
+ self.assertEqual([{"key": "key", "value": "value"},
+ {"key": "readonly", "value": "visible"},
+ {"key": "attached_mode", "value": "visible"}],
+ volume['volume_metadata'])
+
+ admin_metadata = {"invisible_key": "invisible_value",
+ "readonly": "visible",
+ "attached_mode": "visible"}
+ metadata = {"key": "value", "readonly": "existing"}
+ volume = {'admin_metadata': admin_metadata,
+ 'metadata': metadata}
+ utils.add_visible_admin_metadata(volume)
+ self.assertEqual({'key': 'value',
+ 'attached_mode': 'visible',
+ 'readonly': 'visible'},
+ volume['metadata'])
+
+ def test_add_visible_admin_metadata_no_visible_keys(self):
+ admin_metadata = [
+ {"key": "invisible_key1", "value": "invisible_value1"},
+ {"key": "invisible_key2", "value": "invisible_value2"},
+ {"key": "invisible_key3", "value": "invisible_value3"}]
+ metadata = [{"key": "key", "value": "value"}]
+ volume = {'volume_admin_metadata': admin_metadata,
+ 'volume_metadata': metadata}
+ utils.add_visible_admin_metadata(volume)
+ self.assertEqual([{"key": "key", "value": "value"}],
+ volume['volume_metadata'])
+
+ admin_metadata = {"invisible_key1": "invisible_value1",
+ "invisible_key2": "invisible_value2",
+ "invisible_key3": "invisible_value3"}
+ metadata = {"key": "value"}
+ volume = {'admin_metadata': admin_metadata,
+ 'metadata': metadata}
+ utils.add_visible_admin_metadata(volume)
+ self.assertEqual({'key': 'value'}, volume['metadata'])
+
+ def test_add_visible_admin_metadata_no_existing_metadata(self):
+ admin_metadata = [{"key": "invisible_key", "value": "invisible_value"},
+ {"key": "readonly", "value": "visible"},
+ {"key": "attached_mode", "value": "visible"}]
+ volume = {'volume_admin_metadata': admin_metadata}
+ utils.add_visible_admin_metadata(volume)
+ self.assertEqual({'attached_mode': 'visible', 'readonly': 'visible'},
+ volume['metadata'])
+
+ admin_metadata = {"invisible_key": "invisible_value",
+ "readonly": "visible",
+ "attached_mode": "visible"}
+ volume = {'admin_metadata': admin_metadata}
+ utils.add_visible_admin_metadata(volume)
+ self.assertEqual({'attached_mode': 'visible', 'readonly': 'visible'},
+ volume['metadata'])
+
+
+class InvalidFilterTestCase(test.TestCase):
+ def test_admin_allows_all_options(self):
+ ctxt = mock.Mock(name='context')
+ ctxt.is_admin = True
+
+ filters = {'allowed1': None, 'allowed2': None, 'not_allowed1': None}
+ fltrs_orig = {'allowed1': None, 'allowed2': None, 'not_allowed1': None}
+ allowed_search_options = ('allowed1', 'allowed2')
+ allowed_orig = ('allowed1', 'allowed2')
+
+ utils.remove_invalid_filter_options(ctxt, filters,
+ allowed_search_options)
+
+ self.assertEqual(allowed_orig, allowed_search_options)
+ self.assertEqual(fltrs_orig, filters)
+
+ def test_admin_allows_some_options(self):
+ ctxt = mock.Mock(name='context')
+ ctxt.is_admin = False
+
+ filters = {'allowed1': None, 'allowed2': None, 'not_allowed1': None}
+ fltrs_orig = {'allowed1': None, 'allowed2': None, 'not_allowed1': None}
+ allowed_search_options = ('allowed1', 'allowed2')
+ allowed_orig = ('allowed1', 'allowed2')
+
+ utils.remove_invalid_filter_options(ctxt, filters,
+ allowed_search_options)
+
+ self.assertEqual(allowed_orig, allowed_search_options)
+ self.assertNotEqual(fltrs_orig, filters)
+ self.assertEqual(allowed_search_options, tuple(sorted(filters.keys())))
+
+
+class IsBlkDeviceTestCase(test.TestCase):
+ @mock.patch('stat.S_ISBLK', return_value=True)
+ @mock.patch('os.stat')
+ def test_is_blk_device(self, mock_os_stat, mock_S_ISBLK):
+ dev = 'some_device'
+ self.assertTrue(utils.is_blk_device(dev))
+
+ @mock.patch('stat.S_ISBLK', return_value=False)
+ @mock.patch('os.stat')
+ def test_not_is_blk_device(self, mock_os_stat, mock_S_ISBLK):
+ dev = 'not_some_device'
+ self.assertFalse(utils.is_blk_device(dev))
+
+ @mock.patch('stat.S_ISBLK', side_effect=Exception)
+ @mock.patch('os.stat')
+ def test_fail_is_blk_device(self, mock_os_stat, mock_S_ISBLK):
+ dev = 'device_exception'
+ self.assertFalse(utils.is_blk_device(dev))