From: git-harry Date: Tue, 2 Dec 2014 17:41:20 +0000 (+0000) Subject: Improve testing of cinder/utils.py X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=22f968eae5abc6ad3d8846cb32fcdfee1d951e41;p=openstack-build%2Fcinder-build.git Improve testing of cinder/utils.py This commit increases code coverage of cinder/utils.py as well as removing any use of mox and replacing tests that are more functional in nature with tests more focussed on testing the function in cinder/utils.py. Change-Id: I57bb589fa728d715f07aadc4abd0e27670a3a5b6 --- diff --git a/cinder/tests/api/v1/test_volumes.py b/cinder/tests/api/v1/test_volumes.py index a0e70bb25..e3a7b11b9 100644 --- a/cinder/tests/api/v1/test_volumes.py +++ b/cinder/tests/api/v1/test_volumes.py @@ -29,7 +29,6 @@ from cinder.tests.api import fakes from cinder.tests.api.v2 import stubs from cinder.tests import fake_notifier from cinder.tests.image import fake as fake_image -from cinder import utils from cinder.volume import api as volume_api @@ -762,31 +761,6 @@ class VolumeApiTest(test.TestCase): self.assertIn('volumes', res) self.assertEqual(1, len(res['volumes'])) - def test_add_visible_admin_metadata_visible_key_only(self): - admin_metadata = [{"key": "invisible_key", "value": "invisible_value"}, - {"key": "readonly", "value": "visible"}, - {"key": "attached_mode", "value": "visible"}] - metadata = [{"key": "key", "value": "value"}] - volume = dict(volume_admin_metadata=admin_metadata, - volume_metadata=metadata) - utils.add_visible_admin_metadata(volume) - self.assertEqual(volume['volume_metadata'], - [{"key": "key", "value": "value"}, - {"key": "readonly", "value": "visible"}, - {"key": "attached_mode", "value": "visible"}]) - - admin_metadata = {"invisible_key": "invisible_value", - "readonly": "visible", - "attached_mode": "visible"} - metadata = {"key": "value"} - volume = dict(admin_metadata=admin_metadata, - metadata=metadata) - utils.add_visible_admin_metadata(volume) - self.assertEqual(volume['metadata'], - {'key': 'value', - 'attached_mode': 'visible', - 'readonly': 'visible'}) - class VolumeSerializerTest(test.TestCase): def _verify_volume_attachment(self, attach, tree): diff --git a/cinder/tests/api/v2/test_volumes.py b/cinder/tests/api/v2/test_volumes.py index 96aa90d48..1c38dda11 100644 --- a/cinder/tests/api/v2/test_volumes.py +++ b/cinder/tests/api/v2/test_volumes.py @@ -32,7 +32,6 @@ from cinder.tests.api import fakes from cinder.tests.api.v2 import stubs from cinder.tests import fake_notifier from cinder.tests.image import fake as fake_image -from cinder import utils from cinder.volume import api as volume_api CONF = cfg.CONF @@ -1542,32 +1541,6 @@ class VolumeApiTest(test.TestCase): body = {'volume': 'string'} self._create_volume_bad_request(body=body) - def test_add_visible_admin_metadata_visible_key_only(self): - admin_metadata = [{"key": "invisible_key", "value": "invisible_value"}, - {"key": "readonly", "value": "visible"}, - {"key": "attached_mode", "value": "visible"}] - metadata = [{"key": "key", "value": "value"}] - volume = dict(volume_admin_metadata=admin_metadata, - volume_metadata=metadata) - utils.add_visible_admin_metadata(volume) - - self.assertEqual(volume['volume_metadata'], - [{"key": "key", "value": "value"}, - {"key": "readonly", "value": "visible"}, - {"key": "attached_mode", "value": "visible"}]) - - admin_metadata = {"invisible_key": "invisible_value", - "readonly": "visible", - "attached_mode": "visible"} - metadata = {"key": "value"} - volume = dict(admin_metadata=admin_metadata, - metadata=metadata) - utils.add_visible_admin_metadata(volume) - self.assertEqual(volume['metadata'], - {'key': 'value', - 'attached_mode': 'visible', - 'readonly': 'visible'}) - class VolumeSerializerTest(test.TestCase): def _verify_volume_attachment(self, attach, tree): diff --git a/cinder/tests/test_utils.py b/cinder/tests/test_utils.py index 6e02759f1..29ce54dc2 100644 --- a/cinder/tests/test_utils.py +++ b/cinder/tests/test_utils.py @@ -16,8 +16,6 @@ import datetime import hashlib import os -import socket -import tempfile import uuid import mock @@ -28,8 +26,6 @@ import paramiko import six import cinder -from cinder.brick.initiator import connector -from cinder.brick.initiator import linuxfc from cinder import exception from cinder import ssh_utils from cinder import test @@ -39,93 +35,34 @@ from cinder import utils CONF = cfg.CONF -def _get_local_mock_open(fake_data='abcd efgh'): - mock_context_manager = mock.Mock() - mock_context_manager.__enter__ = mock.Mock( - return_value=six.StringIO(fake_data)) - mock_context_manager.__exit__ = mock.Mock(return_value=False) - return mock_context_manager - - class ExecuteTestCase(test.TestCase): - def test_retry_on_failure(self): - fd, tmpfilename = tempfile.mkstemp() - _, tmpfilename2 = tempfile.mkstemp() - try: - fp = os.fdopen(fd, 'w+') - fp.write('''#!/bin/sh -# If stdin fails to get passed during one of the runs, make a note. -if ! grep -q foo -then - echo 'failure' > "$1" -fi -# If stdin has failed to get passed during this or a previous run, exit early. -if grep failure "$1" -then - exit 1 -fi -runs="$(cat $1)" -if [ -z "$runs" ] -then - runs=0 -fi -runs=$(($runs + 1)) -echo $runs > "$1" -exit 1 -''') - fp.close() - os.chmod(tmpfilename, 0o755) - self.assertRaises(putils.ProcessExecutionError, - utils.execute, - tmpfilename, tmpfilename2, attempts=10, - process_input='foo', - delay_on_retry=False) - fp = open(tmpfilename2, 'r+') - runs = fp.read() - fp.close() - self.assertNotEqual(runs.strip(), 'failure', 'stdin did not ' - 'always get passed ' - 'correctly') - runs = int(runs.strip()) - self.assertEqual(runs, 10, 'Ran %d times instead of 10.' % (runs,)) - finally: - os.unlink(tmpfilename) - os.unlink(tmpfilename2) - - def test_unknown_kwargs_raises_error(self): - self.assertRaises(putils.UnknownArgumentError, - utils.execute, - '/usr/bin/env', 'true', - this_is_not_a_valid_kwarg=True) - - def test_check_exit_code_boolean(self): - utils.execute('/usr/bin/env', 'false', check_exit_code=False) - self.assertRaises(putils.ProcessExecutionError, - utils.execute, - '/usr/bin/env', 'false', check_exit_code=True) - - def test_no_retry_on_success(self): - fd, tmpfilename = tempfile.mkstemp() - _, tmpfilename2 = tempfile.mkstemp() - try: - fp = os.fdopen(fd, 'w+') - fp.write('''#!/bin/sh -# If we've already run, bail out. -grep -q foo "$1" && exit 1 -# Mark that we've run before. -echo foo > "$1" -# Check that stdin gets passed correctly. -grep foo -''') - fp.close() - os.chmod(tmpfilename, 0o755) - utils.execute(tmpfilename, - tmpfilename2, - process_input='foo', - attempts=2) - finally: - os.unlink(tmpfilename) - os.unlink(tmpfilename2) + @mock.patch('cinder.utils.processutils.execute') + def test_execute(self, mock_putils_exe): + output = utils.execute('a', 1, foo='bar') + self.assertEqual(mock_putils_exe.return_value, output) + mock_putils_exe.assert_called_once_with('a', 1, foo='bar') + + @mock.patch('cinder.utils.get_root_helper') + @mock.patch('cinder.utils.processutils.execute') + def test_execute_root(self, mock_putils_exe, mock_get_helper): + output = utils.execute('a', 1, foo='bar', run_as_root=True) + self.assertEqual(mock_putils_exe.return_value, output) + mock_helper = mock_get_helper.return_value + mock_putils_exe.assert_called_once_with('a', 1, foo='bar', + run_as_root=True, + root_helper=mock_helper) + + @mock.patch('cinder.utils.get_root_helper') + @mock.patch('cinder.utils.processutils.execute') + def test_execute_root_and_helper(self, mock_putils_exe, mock_get_helper): + mock_helper = mock.Mock() + output = utils.execute('a', 1, foo='bar', run_as_root=True, + root_helper=mock_helper) + self.assertEqual(mock_putils_exe.return_value, output) + self.assertFalse(mock_get_helper.called) + mock_putils_exe.assert_called_once_with('a', 1, foo='bar', + run_as_root=True, + root_helper=mock_helper) class GetFromPathTestCase(test.TestCase): @@ -309,6 +246,16 @@ class GenericUtilsTestCase(test.TestCase): obj, quiet=False) + def test_is_int_like(self): + self.assertTrue(utils.is_int_like(1)) + self.assertTrue(utils.is_int_like(-1)) + self.assertTrue(utils.is_int_like(0b1)) + self.assertTrue(utils.is_int_like(0o1)) + self.assertTrue(utils.is_int_like(0x1)) + self.assertTrue(utils.is_int_like('1')) + self.assertFalse(utils.is_int_like(1.0)) + self.assertFalse(utils.is_int_like('abc')) + def test_check_exclusive_options(self): utils.check_exclusive_options() utils.check_exclusive_options(something=None, @@ -361,34 +308,50 @@ class GenericUtilsTestCase(test.TestCase): hostname = "<}\x1fh\x10e\x08l\x02l\x05o\x12!{>" self.assertEqual("hello", utils.sanitize_hostname(hostname)) + def test_is_valid_boolstr(self): + self.assertTrue(utils.is_valid_boolstr(True)) + self.assertTrue(utils.is_valid_boolstr('trUe')) + self.assertTrue(utils.is_valid_boolstr(False)) + self.assertTrue(utils.is_valid_boolstr('faLse')) + self.assertTrue(utils.is_valid_boolstr('yeS')) + self.assertTrue(utils.is_valid_boolstr('nO')) + self.assertTrue(utils.is_valid_boolstr('y')) + self.assertTrue(utils.is_valid_boolstr('N')) + self.assertTrue(utils.is_valid_boolstr(1)) + self.assertTrue(utils.is_valid_boolstr('1')) + self.assertTrue(utils.is_valid_boolstr(0)) + self.assertTrue(utils.is_valid_boolstr('0')) + def test_generate_glance_url(self): generated_url = utils.generate_glance_url() actual_url = "http://%s:%d" % (CONF.glance_host, CONF.glance_port) self.assertEqual(generated_url, actual_url) - def test_read_file_as_root(self): - def fake_execute(*args, **kwargs): - if args[1] == 'bad': - raise putils.ProcessExecutionError - return 'fakecontents', None - - self.stubs.Set(utils, 'execute', fake_execute) - contents = utils.read_file_as_root('good') - self.assertEqual(contents, 'fakecontents') + @mock.patch('os.path.join', side_effect=lambda x, y: '/'.join((x, y))) + def test_make_dev_path(self, mock_join): + self.assertEqual('/dev/xvda', utils.make_dev_path('xvda')) + self.assertEqual('/dev/xvdb1', utils.make_dev_path('xvdb', 1)) + self.assertEqual('/foo/xvdc1', utils.make_dev_path('xvdc', 1, '/foo')) + + @mock.patch('cinder.utils.execute') + def test_read_file_as_root(self, mock_exec): + out = mock.Mock() + err = mock.Mock() + mock_exec.return_value = (out, err) + test_filepath = '/some/random/path' + output = utils.read_file_as_root(test_filepath) + mock_exec.assert_called_once_with('cat', test_filepath, + run_as_root=True) + self.assertEqual(out, output) + + @mock.patch('cinder.utils.execute', + side_effect=putils.ProcessExecutionError) + def test_read_file_as_root_fails(self, mock_exec): + test_filepath = '/some/random/path' self.assertRaises(exception.FileNotFound, - utils.read_file_as_root, 'bad') - - def test_temporary_chown(self): - def fake_execute(*args, **kwargs): - if args[0] == 'chown': - fake_execute.uid = args[1] - self.stubs.Set(utils, 'execute', fake_execute) - - with tempfile.NamedTemporaryFile() as f: - with utils.temporary_chown(f.name, owner_uid=2): - self.assertEqual(fake_execute.uid, 2) - self.assertEqual(fake_execute.uid, os.getuid()) + utils.read_file_as_root, + test_filepath) @mock.patch('oslo.utils.timeutils.utcnow') def test_service_is_up(self, mock_utcnow): @@ -541,6 +504,148 @@ class GenericUtilsTestCase(test.TestCase): self.assertEqual(gid, 33333) mock_stat.assert_called_once_with(test_file) + @mock.patch('cinder.utils.CONF') + def test_get_root_helper(self, mock_conf): + mock_conf.rootwrap_config = '/path/to/conf' + self.assertEqual('sudo cinder-rootwrap /path/to/conf', + utils.get_root_helper()) + + +class TemporaryChownTestCase(test.TestCase): + @mock.patch('os.stat') + @mock.patch('os.getuid', return_value=1234) + @mock.patch('cinder.utils.execute') + def test_get_uid(self, mock_exec, mock_getuid, mock_stat): + mock_stat.return_value.st_uid = 5678 + test_filename = 'a_file' + with utils.temporary_chown(test_filename): + mock_exec.assert_called_once_with('chown', 1234, test_filename, + run_as_root=True) + mock_getuid.asset_called_once_with() + mock_stat.assert_called_once_with(test_filename) + calls = [mock.call('chown', 1234, test_filename, run_as_root=True), + mock.call('chown', 5678, test_filename, run_as_root=True)] + mock_exec.assert_has_calls(calls) + + @mock.patch('os.stat') + @mock.patch('os.getuid', return_value=1234) + @mock.patch('cinder.utils.execute') + def test_supplied_owner_uid(self, mock_exec, mock_getuid, mock_stat): + mock_stat.return_value.st_uid = 5678 + test_filename = 'a_file' + with utils.temporary_chown(test_filename, owner_uid=9101): + mock_exec.assert_called_once_with('chown', 9101, test_filename, + run_as_root=True) + self.assertFalse(mock_getuid.called) + mock_stat.assert_called_once_with(test_filename) + calls = [mock.call('chown', 9101, test_filename, run_as_root=True), + mock.call('chown', 5678, test_filename, run_as_root=True)] + mock_exec.assert_has_calls(calls) + + @mock.patch('os.stat') + @mock.patch('os.getuid', return_value=5678) + @mock.patch('cinder.utils.execute') + def test_matching_uid(self, mock_exec, mock_getuid, mock_stat): + mock_stat.return_value.st_uid = 5678 + test_filename = 'a_file' + with utils.temporary_chown(test_filename): + pass + mock_getuid.asset_called_once_with() + mock_stat.assert_called_once_with(test_filename) + self.assertFalse(mock_exec.called) + + +class TempdirTestCase(test.TestCase): + @mock.patch('tempfile.mkdtemp') + @mock.patch('shutil.rmtree') + def test_tempdir(self, mock_rmtree, mock_mkdtemp): + with utils.tempdir(a='1', b=2) as td: + self.assertEqual(mock_mkdtemp.return_value, td) + self.assertFalse(mock_rmtree.called) + mock_mkdtemp.assert_called_once_with(a='1', b=2) + mock_rmtree.assert_called_once_with(mock_mkdtemp.return_value) + + @mock.patch('tempfile.mkdtemp') + @mock.patch('shutil.rmtree', side_effect=OSError) + def test_tempdir_error(self, mock_rmtree, mock_mkdtemp): + with utils.tempdir(a='1', b=2) as td: + self.assertEqual(mock_mkdtemp.return_value, td) + self.assertFalse(mock_rmtree.called) + mock_mkdtemp.assert_called_once_with(a='1', b=2) + mock_rmtree.assert_called_once_with(mock_mkdtemp.return_value) + + +class WalkClassHierarchyTestCase(test.TestCase): + def test_walk_class_hierarchy(self): + class A(object): + pass + + class B(A): + pass + + class C(A): + pass + + class D(B): + pass + + class E(A): + pass + + class_pairs = zip((D, B, E), + utils.walk_class_hierarchy(A, encountered=[C])) + for actual, expected in class_pairs: + self.assertEqual(actual, expected) + + class_pairs = zip((D, B, C, E), utils.walk_class_hierarchy(A)) + for actual, expected in class_pairs: + self.assertEqual(actual, expected) + + +class GetDiskOfPartitionTestCase(test.TestCase): + def test_devpath_is_diskpath(self): + devpath = '/some/path' + st_mock = mock.Mock() + output = utils._get_disk_of_partition(devpath, st_mock) + self.assertEqual('/some/path', output[0]) + self.assertIs(st_mock, output[1]) + + with mock.patch('os.stat') as mock_stat: + devpath = '/some/path' + output = utils._get_disk_of_partition(devpath) + mock_stat.assert_called_once_with(devpath) + self.assertEqual(devpath, output[0]) + self.assertIs(mock_stat.return_value, output[1]) + + @mock.patch('os.stat', side_effect=OSError) + def test_stat_oserror(self, mock_stat): + st_mock = mock.Mock() + devpath = '/some/path1' + output = utils._get_disk_of_partition(devpath, st_mock) + mock_stat.assert_called_once_with('/some/path') + self.assertEqual(devpath, output[0]) + self.assertIs(st_mock, output[1]) + + @mock.patch('stat.S_ISBLK', return_value=True) + @mock.patch('os.stat') + def test_diskpath_is_block_device(self, mock_stat, mock_isblk): + st_mock = mock.Mock() + devpath = '/some/path1' + output = utils._get_disk_of_partition(devpath, st_mock) + self.assertEqual('/some/path', output[0]) + self.assertEqual(mock_stat.return_value, output[1]) + + @mock.patch('stat.S_ISBLK', return_value=False) + @mock.patch('os.stat') + def test_diskpath_is_not_block_device(self, mock_stat, mock_isblk): + st_mock = mock.Mock() + devpath = '/some/path1' + output = utils._get_disk_of_partition(devpath, st_mock) + self.assertEqual(devpath, output[0]) + self.assertEqual(st_mock, output[1]) + + +class GetBlkdevMajorMinorTestCase(test.TestCase): @mock.patch('os.stat') def test_get_blkdev_major_minor(self, mock_stat): @@ -602,48 +707,28 @@ class GenericUtilsTestCase(test.TestCase): dev = self._test_get_blkdev_major_minor_file('nfs-server:/export/path') self.assertIsNone(dev) - -class GetDiskOfPartitionTestCase(test.TestCase): - def test_devpath_is_diskpath(self): - devpath = '/some/path' - st_mock = mock.Mock() - output = utils._get_disk_of_partition(devpath, st_mock) - self.assertEqual('/some/path', output[0]) - self.assertIs(st_mock, output[1]) - - with mock.patch('os.stat') as mock_stat: - devpath = '/some/path' - output = utils._get_disk_of_partition(devpath) - mock_stat.assert_called_once_with(devpath) - self.assertEqual(devpath, output[0]) - self.assertIs(mock_stat.return_value, output[1]) - - @mock.patch('os.stat', side_effect=OSError) - def test_stat_oserror(self, mock_stat): - st_mock = mock.Mock() - devpath = '/some/path1' - output = utils._get_disk_of_partition(devpath, st_mock) - mock_stat.assert_called_once_with('/some/path') - self.assertEqual(devpath, output[0]) - self.assertIs(st_mock, output[1]) - - @mock.patch('stat.S_ISBLK', return_value=True) @mock.patch('os.stat') - def test_diskpath_is_block_device(self, mock_stat, mock_isblk): - st_mock = mock.Mock() - devpath = '/some/path1' - output = utils._get_disk_of_partition(devpath, st_mock) - self.assertEqual('/some/path', output[0]) - self.assertEqual(mock_stat.return_value, output[1]) - + @mock.patch('stat.S_ISCHR', return_value=False) @mock.patch('stat.S_ISBLK', return_value=False) + def test_get_blkdev_failure(self, mock_isblk, mock_ischr, mock_stat): + path = '/some/path' + self.assertRaises(exception.Error, + utils.get_blkdev_major_minor, + path, lookup_for_file=False) + mock_stat.assert_called_once_with(path) + mock_isblk.assert_called_once_with(mock_stat.return_value.st_mode) + mock_ischr.assert_called_once_with(mock_stat.return_value.st_mode) + @mock.patch('os.stat') - def test_diskpath_is_not_block_device(self, mock_stat, mock_isblk): - st_mock = mock.Mock() - devpath = '/some/path1' - output = utils._get_disk_of_partition(devpath, st_mock) - self.assertEqual(devpath, output[0]) - self.assertEqual(st_mock, output[1]) + @mock.patch('stat.S_ISCHR', return_value=True) + @mock.patch('stat.S_ISBLK', return_value=False) + def test_get_blkdev_is_chr(self, mock_isblk, mock_ischr, mock_stat): + path = '/some/path' + output = utils.get_blkdev_major_minor(path, lookup_for_file=False) + mock_stat.assert_called_once_with(path) + mock_isblk.assert_called_once_with(mock_stat.return_value.st_mode) + mock_ischr.assert_called_once_with(mock_stat.return_value.st_mode) + self.assertIs(None, output) class MonkeyPatchTestCase(test.TestCase): @@ -804,6 +889,24 @@ class AuditPeriodTest(test.TestCase): month=2, year=2012)) + @mock.patch('oslo.utils.timeutils.utcnow', + return_value=datetime.datetime(day=1, + month=1, + year=2012)) + def test_month_jan_day_first(self, mock_utcnow): + begin, end = utils.last_completed_audit_period(unit='month') + self.assertEqual(datetime.datetime(day=1, month=11, year=2011), begin) + self.assertEqual(datetime.datetime(day=1, month=12, year=2011), end) + + @mock.patch('oslo.utils.timeutils.utcnow', + return_value=datetime.datetime(day=2, + month=1, + year=2012)) + def test_month_jan_day_not_first(self, mock_utcnow): + begin, end = utils.last_completed_audit_period(unit='month') + self.assertEqual(datetime.datetime(day=1, month=12, year=2011), begin) + self.assertEqual(datetime.datetime(day=1, month=1, year=2012), end) + def test_year(self): begin, end = utils.last_completed_audit_period(unit='year') self.assertEqual(begin, datetime.datetime(day=1, @@ -831,6 +934,22 @@ class AuditPeriodTest(test.TestCase): month=6, year=2011)) + def test_invalid_unit(self): + self.assertRaises(ValueError, + utils.last_completed_audit_period, + unit='invalid_unit') + + @mock.patch('cinder.utils.CONF') + def test_uses_conf_unit(self, mock_conf): + mock_conf.volume_usage_audit_period = 'hour' + begin1, end1 = utils.last_completed_audit_period() + self.assertEqual(60.0 * 60, (end1 - begin1).total_seconds()) + + mock_conf.volume_usage_audit_period = 'day' + begin2, end2 = utils.last_completed_audit_period() + + self.assertEqual(60.0 * 60 * 24, (end2 - begin2).total_seconds()) + class FakeSSHClient(object): @@ -1102,66 +1221,27 @@ class BrickUtils(test.TestCase): wrapper functions. """ - def test_brick_get_connector_properties(self): - - self.mox.StubOutWithMock(socket, 'gethostname') - socket.gethostname().AndReturn('fakehost') - - self.mox.StubOutWithMock(connector.ISCSIConnector, 'get_initiator') - connector.ISCSIConnector.get_initiator().AndReturn('fakeinitiator') - - self.mox.StubOutWithMock(linuxfc.LinuxFibreChannel, 'get_fc_wwpns') - linuxfc.LinuxFibreChannel.get_fc_wwpns().AndReturn(None) - - self.mox.StubOutWithMock(linuxfc.LinuxFibreChannel, 'get_fc_wwnns') - linuxfc.LinuxFibreChannel.get_fc_wwnns().AndReturn(None) - - props = {'initiator': 'fakeinitiator', - 'host': 'fakehost', - 'ip': CONF.my_ip, - } - - self.mox.ReplayAll() - props_actual = utils.brick_get_connector_properties() - self.assertEqual(props, props_actual) - self.mox.VerifyAll() - - def test_brick_get_connector(self): - - root_helper = utils.get_root_helper() - - self.mox.StubOutClassWithMocks(connector, 'ISCSIConnector') - connector.ISCSIConnector(execute=putils.execute, - driver=None, - root_helper=root_helper, - use_multipath=False, - device_scan_attempts=3) - - self.mox.StubOutClassWithMocks(connector, 'FibreChannelConnector') - connector.FibreChannelConnector(execute=putils.execute, - driver=None, - root_helper=root_helper, - use_multipath=False, - device_scan_attempts=3) - - self.mox.StubOutClassWithMocks(connector, 'AoEConnector') - connector.AoEConnector(execute=putils.execute, - driver=None, - root_helper=root_helper, - device_scan_attempts=3) - - self.mox.StubOutClassWithMocks(connector, 'LocalConnector') - connector.LocalConnector(execute=putils.execute, - driver=None, - root_helper=root_helper, - device_scan_attempts=3) - - self.mox.ReplayAll() - utils.brick_get_connector('iscsi') - utils.brick_get_connector('fibre_channel') - utils.brick_get_connector('aoe') - utils.brick_get_connector('local') - self.mox.VerifyAll() + @mock.patch('cinder.utils.CONF') + @mock.patch('cinder.brick.initiator.connector.get_connector_properties') + @mock.patch('cinder.utils.get_root_helper') + def test_brick_get_connector_properties(self, mock_helper, mock_get, + mock_conf): + mock_conf.my_ip = '1.2.3.4' + output = utils.brick_get_connector_properties() + mock_helper.assert_called_once_with() + mock_get.assert_called_once_with(mock_helper.return_value, '1.2.3.4') + self.assertEqual(mock_get.return_value, output) + + @mock.patch('cinder.brick.initiator.connector.InitiatorConnector.factory') + @mock.patch('cinder.utils.get_root_helper') + def test_brick_get_connector(self, mock_helper, mock_factory): + output = utils.brick_get_connector('protocol') + mock_helper.assert_called_once_with() + self.assertEqual(mock_factory.return_value, output) + mock_factory.assert_called_once_with( + 'protocol', mock_helper.return_value, driver=None, + execute=putils.execute, use_multipath=False, + device_scan_attempts=3) class StringLengthTestCase(test.TestCase): @@ -1177,3 +1257,122 @@ class StringLengthTestCase(test.TestCase): self.assertRaises(exception.InvalidInput, utils.check_string_length, 'a' * 256, 'name', max_length=255) + + +class AddVisibleAdminMetadataTestCase(test.TestCase): + def test_add_visible_admin_metadata_visible_key_only(self): + admin_metadata = [{"key": "invisible_key", "value": "invisible_value"}, + {"key": "readonly", "value": "visible"}, + {"key": "attached_mode", "value": "visible"}] + metadata = [{"key": "key", "value": "value"}, + {"key": "readonly", "value": "existing"}] + volume = {'volume_admin_metadata': admin_metadata, + 'volume_metadata': metadata} + utils.add_visible_admin_metadata(volume) + self.assertEqual([{"key": "key", "value": "value"}, + {"key": "readonly", "value": "visible"}, + {"key": "attached_mode", "value": "visible"}], + volume['volume_metadata']) + + admin_metadata = {"invisible_key": "invisible_value", + "readonly": "visible", + "attached_mode": "visible"} + metadata = {"key": "value", "readonly": "existing"} + volume = {'admin_metadata': admin_metadata, + 'metadata': metadata} + utils.add_visible_admin_metadata(volume) + self.assertEqual({'key': 'value', + 'attached_mode': 'visible', + 'readonly': 'visible'}, + volume['metadata']) + + def test_add_visible_admin_metadata_no_visible_keys(self): + admin_metadata = [ + {"key": "invisible_key1", "value": "invisible_value1"}, + {"key": "invisible_key2", "value": "invisible_value2"}, + {"key": "invisible_key3", "value": "invisible_value3"}] + metadata = [{"key": "key", "value": "value"}] + volume = {'volume_admin_metadata': admin_metadata, + 'volume_metadata': metadata} + utils.add_visible_admin_metadata(volume) + self.assertEqual([{"key": "key", "value": "value"}], + volume['volume_metadata']) + + admin_metadata = {"invisible_key1": "invisible_value1", + "invisible_key2": "invisible_value2", + "invisible_key3": "invisible_value3"} + metadata = {"key": "value"} + volume = {'admin_metadata': admin_metadata, + 'metadata': metadata} + utils.add_visible_admin_metadata(volume) + self.assertEqual({'key': 'value'}, volume['metadata']) + + def test_add_visible_admin_metadata_no_existing_metadata(self): + admin_metadata = [{"key": "invisible_key", "value": "invisible_value"}, + {"key": "readonly", "value": "visible"}, + {"key": "attached_mode", "value": "visible"}] + volume = {'volume_admin_metadata': admin_metadata} + utils.add_visible_admin_metadata(volume) + self.assertEqual({'attached_mode': 'visible', 'readonly': 'visible'}, + volume['metadata']) + + admin_metadata = {"invisible_key": "invisible_value", + "readonly": "visible", + "attached_mode": "visible"} + volume = {'admin_metadata': admin_metadata} + utils.add_visible_admin_metadata(volume) + self.assertEqual({'attached_mode': 'visible', 'readonly': 'visible'}, + volume['metadata']) + + +class InvalidFilterTestCase(test.TestCase): + def test_admin_allows_all_options(self): + ctxt = mock.Mock(name='context') + ctxt.is_admin = True + + filters = {'allowed1': None, 'allowed2': None, 'not_allowed1': None} + fltrs_orig = {'allowed1': None, 'allowed2': None, 'not_allowed1': None} + allowed_search_options = ('allowed1', 'allowed2') + allowed_orig = ('allowed1', 'allowed2') + + utils.remove_invalid_filter_options(ctxt, filters, + allowed_search_options) + + self.assertEqual(allowed_orig, allowed_search_options) + self.assertEqual(fltrs_orig, filters) + + def test_admin_allows_some_options(self): + ctxt = mock.Mock(name='context') + ctxt.is_admin = False + + filters = {'allowed1': None, 'allowed2': None, 'not_allowed1': None} + fltrs_orig = {'allowed1': None, 'allowed2': None, 'not_allowed1': None} + allowed_search_options = ('allowed1', 'allowed2') + allowed_orig = ('allowed1', 'allowed2') + + utils.remove_invalid_filter_options(ctxt, filters, + allowed_search_options) + + self.assertEqual(allowed_orig, allowed_search_options) + self.assertNotEqual(fltrs_orig, filters) + self.assertEqual(allowed_search_options, tuple(sorted(filters.keys()))) + + +class IsBlkDeviceTestCase(test.TestCase): + @mock.patch('stat.S_ISBLK', return_value=True) + @mock.patch('os.stat') + def test_is_blk_device(self, mock_os_stat, mock_S_ISBLK): + dev = 'some_device' + self.assertTrue(utils.is_blk_device(dev)) + + @mock.patch('stat.S_ISBLK', return_value=False) + @mock.patch('os.stat') + def test_not_is_blk_device(self, mock_os_stat, mock_S_ISBLK): + dev = 'not_some_device' + self.assertFalse(utils.is_blk_device(dev)) + + @mock.patch('stat.S_ISBLK', side_effect=Exception) + @mock.patch('os.stat') + def test_fail_is_blk_device(self, mock_os_stat, mock_S_ISBLK): + dev = 'device_exception' + self.assertFalse(utils.is_blk_device(dev))