Fakes For Scheduler tests.
"""
-import mox
+import mock
from cinder import db
from cinder.openstack.common import timeutils
setattr(self, key, val)
-def mox_host_manager_db_calls(mock, context):
- mock.StubOutWithMock(db, 'service_get_all_by_topic')
-
+def mock_host_manager_db_calls(mock_obj):
services = [
dict(id=1, host='host1', topic='volume', disabled=False,
availability_zone='zone1', updated_at=timeutils.utcnow()),
dict(id=5, host='host5', topic='volume', disabled=True,
availability_zone='zone4', updated_at=timeutils.utcnow()),
]
-
- db.service_get_all_by_topic(mox.IgnoreArg(),
- mox.IgnoreArg()).AndReturn(services)
+ mock_obj.return_value = services
Tests For Capacity Weigher.
"""
-import testtools
+import mock
+
+from oslo.config import cfg
from cinder import context
from cinder.openstack.common.scheduler.weights import HostWeightHandler
-
from cinder.scheduler.weights.capacity import CapacityWeigher
from cinder import test
from cinder.tests.scheduler import fakes
-from cinder.tests import utils as test_utils
+
+CONF = cfg.CONF
class CapacityWeigherTestCase(test.TestCase):
hosts,
weight_properties)[0]
- def _get_all_hosts(self):
+ @mock.patch('cinder.db.sqlalchemy.api.service_get_all_by_topic')
+ def _get_all_hosts(self, _mock_service_get_all_by_topic):
ctxt = context.get_admin_context()
- fakes.mox_host_manager_db_calls(self.mox, ctxt)
- self.mox.ReplayAll()
+ fakes.mock_host_manager_db_calls(_mock_service_get_all_by_topic)
host_states = self.host_manager.get_all_host_states(ctxt)
- self.mox.VerifyAll()
- self.mox.ResetAll()
+ _mock_service_get_all_by_topic.assert_called_once_with(
+ ctxt, CONF.volume_topic)
return host_states
def test_default_of_spreading_first(self):
Tests For Chance Weigher.
"""
+import mock
import random
-import testtools
-from oslo.config import cfg
-
-from cinder import context
from cinder.scheduler import host_manager
from cinder.scheduler.weights.chance import ChanceWeigher
from cinder import test
-from cinder.tests import utils as test_utils
class ChanceWeigherTestCase(test.TestCase):
self.not_random_float += 1.0
return self.not_random_float
- def test_chance_weigher(self):
+ @mock.patch('random.random')
+ def test_chance_weigher(self, _mock_random):
# stub random.random() to verify the ChanceWeigher
# is using random.random() (repeated calls to weigh should
# return incrementing weights)
weigher = ChanceWeigher()
- self.stubs.Set(random, 'random', self.fake_random)
+ _mock_random.side_effect = self.fake_random
self.fake_random(reset=True)
host_state = {'host': 'host.example.com', 'free_capacity_gb': 99999}
weight = weigher._weigh_object(host_state, None)
Tests For Filter Scheduler.
"""
-import testtools
+import mock
from cinder import context
from cinder import exception
-from cinder import test
-
-from cinder.openstack.common.scheduler import weights
from cinder.scheduler import filter_scheduler
from cinder.scheduler import host_manager
from cinder.tests.scheduler import fakes
from cinder.tests.scheduler import test_scheduler
-from cinder.tests import utils as test_utils
class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
driver_cls = filter_scheduler.FilterScheduler
def test_create_volume_no_hosts(self):
- """Ensure empty hosts/child_zones result in NoValidHosts exception."""
- def _fake_empty_call_zone_method(*args, **kwargs):
- return []
-
+ # Ensure empty hosts/child_zones result in NoValidHosts exception.
sched = fakes.FakeFilterScheduler()
fake_context = context.RequestContext('user', 'project')
self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
fake_context, request_spec, {})
- def test_create_volume_non_admin(self):
- """Test creating an instance locally using run_instance, passing
- a non-admin context. DB actions should work.
- """
+ @mock.patch('cinder.scheduler.host_manager.HostManager.'
+ 'get_all_host_states')
+ def test_create_volume_non_admin(self, _mock_get_all_host_states):
+ # Test creating a volume locally using create_volume, passing
+ # a non-admin context. DB actions should work.
self.was_admin = False
- def fake_get(context, *args, **kwargs):
- # make sure this is called with admin context, even though
- # we're using user context below
- self.was_admin = context.is_admin
+ def fake_get(ctxt):
+ # Make sure this is called with admin context, even though
+ # we're using user context below.
+ self.was_admin = ctxt.is_admin
return {}
sched = fakes.FakeFilterScheduler()
- self.stubs.Set(sched.host_manager, 'get_all_host_states', fake_get)
+ _mock_get_all_host_states.side_effect = fake_get
fake_context = context.RequestContext('user', 'project')
fake_context, request_spec, {})
self.assertTrue(self.was_admin)
- def test_schedule_happy_day(self):
- """Make sure there's nothing glaringly wrong with _schedule()
- by doing a happy day pass through.
- """
-
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ def test_schedule_happy_day(self, _mock_service_get_all_by_topic):
+ # Make sure there's nothing glaringly wrong with _schedule()
+ # by doing a happy day pass through.
sched = fakes.FakeFilterScheduler()
sched.host_manager = fakes.FakeHostManager()
fake_context = context.RequestContext('user', 'project',
is_admin=True)
- fakes.mox_host_manager_db_calls(self.mox, fake_context)
+ fakes.mock_host_manager_db_calls(_mock_service_get_all_by_topic)
request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
'volume_properties': {'project_id': 1,
'size': 1}}
- self.mox.ReplayAll()
weighed_host = sched._schedule(fake_context, request_spec, {})
self.assertIsNotNone(weighed_host.obj)
+ self.assertTrue(_mock_service_get_all_by_topic.called)
def test_max_attempts(self):
self.flags(scheduler_max_attempts=4)
sched._schedule(self.context, request_spec,
filter_properties=filter_properties)
- # should not have retry info in the populated filter properties:
+ # Should not have retry info in the populated filter properties.
self.assertNotIn("retry", filter_properties)
def test_retry_attempt_one(self):
self.assertEqual(1024, host_state.total_capacity_gb)
- def _host_passes_filters_setup(self):
+ def _host_passes_filters_setup(self, mock_obj):
sched = fakes.FakeFilterScheduler()
sched.host_manager = fakes.FakeHostManager()
fake_context = context.RequestContext('user', 'project',
is_admin=True)
- fakes.mox_host_manager_db_calls(self.mox, fake_context)
+ fakes.mock_host_manager_db_calls(mock_obj)
- self.mox.ReplayAll()
return (sched, fake_context)
- def test_host_passes_filters_happy_day(self):
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ def test_host_passes_filters_happy_day(self, _mock_service_get_topic):
"""Do a successful pass through of with host_passes_filters()."""
- sched, ctx = self._host_passes_filters_setup()
+ sched, ctx = self._host_passes_filters_setup(
+ _mock_service_get_topic)
request_spec = {'volume_id': 1,
'volume_type': {'name': 'LVM_iSCSI'},
'volume_properties': {'project_id': 1,
'size': 1}}
ret_host = sched.host_passes_filters(ctx, 'host1', request_spec, {})
self.assertEqual(ret_host.host, 'host1')
+ self.assertTrue(_mock_service_get_topic.called)
- def test_host_passes_filters_no_capacity(self):
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ def test_host_passes_filters_no_capacity(self, _mock_service_get_topic):
"""Fail the host due to insufficient capacity."""
- sched, ctx = self._host_passes_filters_setup()
+ sched, ctx = self._host_passes_filters_setup(
+ _mock_service_get_topic)
request_spec = {'volume_id': 1,
'volume_type': {'name': 'LVM_iSCSI'},
'volume_properties': {'project_id': 1,
self.assertRaises(exception.NoValidHost,
sched.host_passes_filters,
ctx, 'host1', request_spec, {})
+ self.assertTrue(_mock_service_get_topic.called)
Tests For Scheduler Host Filters.
"""
-import httplib
-import stubout
-import testtools
+import mock
from cinder import context
-from cinder import db
-from cinder import exception
from cinder.openstack.common import jsonutils
from cinder.openstack.common.scheduler import filters
from cinder import test
from cinder.tests.scheduler import fakes
-from cinder.tests import utils as test_utils
-from cinder import utils
-
-
-DATA = ''
-
-
-def stub_out_https_backend(stubs):
- """Stub out the httplib.HTTPRequest.getresponse.
-
- return faked-out data instead of grabbing actual contents of a resource.
-
- The stubbed getresponse() returns an iterator over
- the data "I am a teapot, short and stout\n"
-
- :param stubs: Set of stubout stubs
- """
-
- class FakeHTTPResponse(object):
-
- def read(self):
- return DATA
-
- def fake_do_request(self, *args, **kwargs):
- return httplib.OK, FakeHTTPResponse()
class HostFiltersTestCase(test.TestCase):
def setUp(self):
super(HostFiltersTestCase, self).setUp()
- self.stubs = stubout.StubOutForTesting()
- stub_out_https_backend(self.stubs)
self.context = context.RequestContext('fake', 'fake')
self.json_query = jsonutils.dumps(
['and',
for cls in classes:
self.class_map[cls.__name__] = cls
- def _stub_service_is_up(self, ret_value):
- def fake_service_is_up(service):
- return ret_value
- self.stubs.Set(utils, 'service_is_up', fake_service_is_up)
-
- def test_capacity_filter_passes(self):
- self._stub_service_is_up(True)
+ @mock.patch('cinder.utils.service_is_up')
+ def test_capacity_filter_passes(self, _mock_serv_is_up):
+ _mock_serv_is_up.return_value = True
filt_cls = self.class_map['CapacityFilter']()
filter_properties = {'size': 100}
service = {'disabled': False}
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
- def test_capacity_filter_fails(self):
- self._stub_service_is_up(True)
+ @mock.patch('cinder.utils.service_is_up')
+ def test_capacity_filter_fails(self, _mock_serv_is_up):
+ _mock_serv_is_up.return_value = True
filt_cls = self.class_map['CapacityFilter']()
filter_properties = {'size': 100}
service = {'disabled': False}
'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
- def test_capacity_filter_passes_infinite(self):
- self._stub_service_is_up(True)
+ @mock.patch('cinder.utils.service_is_up')
+ def test_capacity_filter_passes_infinite(self, _mock_serv_is_up):
+ _mock_serv_is_up.return_value = True
filt_cls = self.class_map['CapacityFilter']()
filter_properties = {'size': 100}
service = {'disabled': False}
'service': service})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
- def test_capacity_filter_passes_unknown(self):
- self._stub_service_is_up(True)
+ @mock.patch('cinder.utils.service_is_up')
+ def test_capacity_filter_passes_unknown(self, _mock_serv_is_up):
+ _mock_serv_is_up.return_value = True
filt_cls = self.class_map['CapacityFilter']()
filter_properties = {'size': 100}
service = {'disabled': False}
Tests For HostManager
"""
+import mock
+
from oslo.config import cfg
-from cinder import db
from cinder import exception
from cinder.openstack.common.scheduler import filters
from cinder.openstack.common import timeutils
from cinder.scheduler import host_manager
from cinder import test
-from cinder.tests.scheduler import fakes
CONF = cfg.CONF
self.assertEqual(len(filter_classes), 1)
self.assertEqual(filter_classes[0].__name__, 'FakeFilterClass2')
- def _mock_get_filtered_hosts(self, info, specified_filters=None):
- self.mox.StubOutWithMock(self.host_manager, '_choose_host_filters')
-
- info['got_objs'] = []
- info['got_fprops'] = []
-
- def fake_filter_one(_self, obj, filter_props):
- info['got_objs'].append(obj)
- info['got_fprops'].append(filter_props)
- return True
-
- self.stubs.Set(FakeFilterClass1, '_filter_one', fake_filter_one)
- self.host_manager._choose_host_filters(specified_filters).AndReturn(
- [FakeFilterClass1])
+ @mock.patch('cinder.scheduler.host_manager.HostManager.'
+ '_choose_host_filters')
+ def test_get_filtered_hosts(self, _mock_choose_host_filters):
+ filter_class = FakeFilterClass1
+ mock_func = mock.Mock()
+ mock_func.return_value = True
+ filter_class._filter_one = mock_func
+ _mock_choose_host_filters.return_value = [filter_class]
- def _verify_result(self, info, result):
- for x in info['got_fprops']:
- self.assertEqual(x, info['expected_fprops'])
- self.assertEqual(set(info['expected_objs']), set(info['got_objs']))
- self.assertEqual(set(result), set(info['got_objs']))
-
- def test_get_filtered_hosts(self):
fake_properties = {'moo': 1, 'cow': 2}
+ expected = []
+ for fake_host in self.fake_hosts:
+ expected.append(mock.call(fake_host, fake_properties))
- info = {'expected_objs': self.fake_hosts,
- 'expected_fprops': fake_properties}
-
- self._mock_get_filtered_hosts(info)
-
- self.mox.ReplayAll()
result = self.host_manager.get_filtered_hosts(self.fake_hosts,
fake_properties)
- self._verify_result(info, result)
+ self.assertEqual(expected, mock_func.call_args_list)
+ self.assertEqual(set(result), set(self.fake_hosts))
- def test_update_service_capabilities(self):
+ @mock.patch('cinder.openstack.common.timeutils.utcnow')
+ def test_update_service_capabilities(self, _mock_utcnow):
service_states = self.host_manager.service_states
self.assertDictMatch(service_states, {})
- self.mox.StubOutWithMock(timeutils, 'utcnow')
- timeutils.utcnow().AndReturn(31337)
- timeutils.utcnow().AndReturn(31338)
- timeutils.utcnow().AndReturn(31339)
+ _mock_utcnow.side_effect = [31337, 31338, 31339]
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1)
host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1)
host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1)
- self.mox.ReplayAll()
service_name = 'volume'
self.host_manager.update_service_capabilities(service_name, 'host1',
host1_volume_capabs)
'host3': host3_volume_capabs}
self.assertDictMatch(service_states, expected)
- def test_get_all_host_states(self):
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ @mock.patch('cinder.utils.service_is_up')
+ def test_get_all_host_states(self, _mock_service_is_up,
+ _mock_service_get_all_by_topic):
context = 'fake_context'
topic = CONF.volume_topic
- self.mox.StubOutWithMock(db, 'service_get_all_by_topic')
- self.mox.StubOutWithMock(host_manager.LOG, 'warn')
- self.mox.StubOutWithMock(host_manager.utils, 'service_is_up')
-
services = [
dict(id=1, host='host1', topic='volume', disabled=False,
availability_zone='zone1', updated_at=timeutils.utcnow()),
availability_zone='zone4', updated_at=timeutils.utcnow()),
]
- db.service_get_all_by_topic(context, topic).AndReturn(services)
- host_manager.utils.service_is_up(services[0]).AndReturn(True)
- host_manager.utils.service_is_up(services[1]).AndReturn(True)
- host_manager.utils.service_is_up(services[2]).AndReturn(True)
- host_manager.utils.service_is_up(services[3]).AndReturn(True)
- host_manager.utils.service_is_up(services[4]).AndReturn(True)
- # Disabled service
- host_manager.LOG.warn("volume service is down or disabled. "
- "(host: host5)")
-
- db.service_get_all_by_topic(context, topic).AndReturn(services)
- host_manager.utils.service_is_up(services[0]).AndReturn(True)
- host_manager.utils.service_is_up(services[1]).AndReturn(True)
- host_manager.utils.service_is_up(services[2]).AndReturn(True)
- host_manager.utils.service_is_up(services[3]).AndReturn(False)
- # Stopped service
- host_manager.LOG.warn("volume service is down or disabled. "
- "(host: host4)")
- host_manager.utils.service_is_up(services[4]).AndReturn(True)
- # Disabled service
- host_manager.LOG.warn("volume service is down or disabled. "
- "(host: host5)")
-
- self.mox.ReplayAll()
+ # First test: service_is_up is always True, host5 is disabled
+ _mock_service_get_all_by_topic.return_value = services
+ _mock_service_is_up.return_value = True
+ _mock_warning = mock.Mock()
+ host_manager.LOG.warn = _mock_warning
+
+ # Get all states, make sure host5 is reported as down/disabled
self.host_manager.get_all_host_states(context)
+ _mock_service_get_all_by_topic.assert_called_with(context, topic)
+ expected = []
+ for service in services:
+ expected.append(mock.call(service))
+ self.assertEqual(expected, _mock_service_is_up.call_args_list)
+ _mock_warning.assert_called_with("volume service is down or disabled. "
+ "(host: host5)")
+
+ # Get host_state_map and make sure we have the first 4 hosts
host_state_map = self.host_manager.host_state_map
-
self.assertEqual(len(host_state_map), 4)
- # Check that service is up
for i in xrange(4):
volume_node = services[i]
host = volume_node['host']
- self.assertEqual(host_state_map[host].service,
- volume_node)
+ self.assertEqual(host_state_map[host].service, volume_node)
+
+ # Second test: Now service_is_up returns False for host4
+ _mock_service_is_up.reset_mock()
+ _mock_service_is_up.side_effect = [True, True, True, False, True]
+ _mock_service_get_all_by_topic.reset_mock()
+ _mock_warning.reset_mock()
+ # Get all states, make sure hosts 4 and 5 is reported as down/disabled
self.host_manager.get_all_host_states(context)
+ _mock_service_get_all_by_topic.assert_called_with(context, topic)
+ expected = []
+ for service in services:
+ expected.append(mock.call(service))
+ self.assertEqual(expected, _mock_service_is_up.call_args_list)
+ expected = []
+ for num in ['4', '5']:
+ expected.append(mock.call("volume service is down or disabled. "
+ "(host: host" + num + ")"))
+ self.assertEqual(expected, _mock_warning.call_args_list)
+
+ # Get host_state_map and make sure we have the first 4 hosts
host_state_map = self.host_manager.host_state_map
-
self.assertEqual(len(host_state_map), 3)
for i in xrange(3):
volume_node = services[i]
"""
+import mock
+
from oslo.config import cfg
from cinder import context
-from cinder.openstack.common import rpc
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import test
def tearDown(self):
super(SchedulerRpcAPITestCase, self).tearDown()
- def _test_scheduler_api(self, method, rpc_method, **kwargs):
+ def _test_scheduler_api(self, method, rpc_method, _mock_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = scheduler_rpcapi.SchedulerAPI()
- expected_retval = 'foo' if method == 'call' else None
+ expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', rpcapi.RPC_API_VERSION)
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = expected_version
if expected_retval:
return expected_retval
- self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+ _mock_method.side_effect = _fake_rpc_method
retval = getattr(rpcapi, method)(ctxt, **kwargs)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
- def test_update_service_capabilities(self):
+ @mock.patch('cinder.openstack.common.rpc.fanout_cast')
+ def test_update_service_capabilities(self, _mock_rpc_method):
self._test_scheduler_api('update_service_capabilities',
rpc_method='fanout_cast',
+ _mock_method=_mock_rpc_method,
service_name='fake_name',
host='fake_host',
capabilities='fake_capabilities')
- def test_create_volume(self):
+ @mock.patch('cinder.openstack.common.rpc.cast')
+ def test_create_volume(self, _mock_rpc_method):
self._test_scheduler_api('create_volume',
rpc_method='cast',
+ _mock_method=_mock_rpc_method,
topic='topic',
volume_id='volume_id',
snapshot_id='snapshot_id',
filter_properties='filter_properties',
version='1.2')
- def test_migrate_volume_to_host(self):
+ @mock.patch('cinder.openstack.common.rpc.cast')
+ def test_migrate_volume_to_host(self, _mock_rpc_method):
self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast',
+ _mock_method=_mock_rpc_method,
topic='topic',
volume_id='volume_id',
host='host',
Tests For Scheduler
"""
+import mock
+
from cinder import context
from cinder import db
from cinder import exception
manager = self.manager
self.assertIsInstance(manager.driver, self.driver_cls)
- def test_update_service_capabilities(self):
- service_name = 'fake_service'
+ @mock.patch('cinder.scheduler.driver.Scheduler.'
+ 'update_service_capabilities')
+ def test_update_service_capabilities_empty_dict(self, _mock_update_cap):
+ # Test no capabilities passes empty dictionary
+ service = 'fake_service'
host = 'fake_host'
- self.mox.StubOutWithMock(self.manager.driver,
- 'update_service_capabilities')
+ self.manager.update_service_capabilities(self.context,
+ service_name=service,
+ host=host)
+ _mock_update_cap.assert_called_once_with(service, host, {})
- # Test no capabilities passes empty dictionary
- self.manager.driver.update_service_capabilities(service_name,
- host, {})
- self.mox.ReplayAll()
- result = self.manager.update_service_capabilities(
- self.context,
- service_name=service_name,
- host=host)
- self.mox.VerifyAll()
-
- self.mox.ResetAll()
+ @mock.patch('cinder.scheduler.driver.Scheduler.'
+ 'update_service_capabilities')
+ def test_update_service_capabilities_correct(self, _mock_update_cap):
# Test capabilities passes correctly
+ service = 'fake_service'
+ host = 'fake_host'
capabilities = {'fake_capability': 'fake_value'}
- self.manager.driver.update_service_capabilities(service_name,
- host,
- capabilities)
- self.mox.ReplayAll()
- result = self.manager.update_service_capabilities(
- self.context,
- service_name=service_name, host=host,
- capabilities=capabilities)
-
- def test_create_volume_exception_puts_volume_in_error_state(self):
- """Test NoValidHost exception behavior for create_volume.
-
- Puts the volume in 'error' state and eats the exception.
- """
- fake_volume_id = 1
- self._mox_schedule_method_helper('schedule_create_volume')
- self.mox.StubOutWithMock(db, 'volume_update')
+ self.manager.update_service_capabilities(self.context,
+ service_name=service,
+ host=host,
+ capabilities=capabilities)
+ _mock_update_cap.assert_called_once_with(service, host, capabilities)
+
+ @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
+ @mock.patch('cinder.db.volume_update')
+ def test_create_volume_exception_puts_volume_in_error_state(
+ self, _mock_volume_update, _mock_sched_create):
+ # Test NoValidHost exception behavior for create_volume.
+ # Puts the volume in 'error' state and eats the exception.
+ _mock_sched_create.side_effect = exception.NoValidHost(reason="")
+ fake_volume_id = 1
topic = 'fake_topic'
- volume_id = fake_volume_id
request_spec = {'volume_id': fake_volume_id}
- self.manager.driver.schedule_create_volume(
- self.context,
- request_spec, {}).AndRaise(exception.NoValidHost(reason=""))
- db.volume_update(self.context, fake_volume_id, {'status': 'error'})
-
- self.mox.ReplayAll()
- self.manager.create_volume(self.context, topic, volume_id,
+ self.manager.create_volume(self.context, topic, fake_volume_id,
request_spec=request_spec,
filter_properties={})
-
- def test_migrate_volume_exception_returns_volume_state(self):
- """Test NoValidHost exception behavior for migrate_volume_to_host.
-
- Puts the volume in 'error_migrating' state and eats the exception.
- """
+ _mock_volume_update.assert_called_once_with(self.context,
+ fake_volume_id,
+ {'status': 'error'})
+ _mock_sched_create.assert_called_once_with(self.context, request_spec,
+ {})
+
+ @mock.patch('cinder.scheduler.driver.Scheduler.host_passes_filters')
+ @mock.patch('cinder.db.volume_update')
+ def test_migrate_volume_exception_returns_volume_state(
+ self, _mock_volume_update, _mock_host_passes):
+ # Test NoValidHost exception behavior for migrate_volume_to_host.
+ # Puts the volume in 'error_migrating' state and eats the exception.
+ _mock_host_passes.side_effect = exception.NoValidHost(reason="")
fake_volume_id = 1
- self._mox_schedule_method_helper('host_passes_filters')
- self.mox.StubOutWithMock(db, 'volume_update')
-
topic = 'fake_topic'
- volume_id = fake_volume_id
request_spec = {'volume_id': fake_volume_id}
- self.manager.driver.host_passes_filters(
- self.context, 'host',
- request_spec, {}).AndRaise(exception.NoValidHost(reason=""))
- db.volume_update(self.context, fake_volume_id,
- {'migration_status': None})
-
- self.mox.ReplayAll()
- self.manager.migrate_volume_to_host(self.context, topic, volume_id,
- 'host', True,
+ self.manager.migrate_volume_to_host(self.context, topic,
+ fake_volume_id, 'host', True,
request_spec=request_spec,
filter_properties={})
-
- def _mox_schedule_method_helper(self, method_name):
- # Make sure the method exists that we're going to test call
- def stub_method(*args, **kwargs):
- pass
-
- setattr(self.manager.driver, method_name, stub_method)
-
- self.mox.StubOutWithMock(self.manager.driver,
- method_name)
+ _mock_volume_update.assert_called_once_with(self.context,
+ fake_volume_id,
+ {'migration_status': None})
+ _mock_host_passes.assert_called_once_with(self.context, 'host',
+ request_spec, {})
class SchedulerTestCase(test.TestCase):
self.context = context.RequestContext('fake_user', 'fake_project')
self.topic = 'fake_topic'
- def test_update_service_capabilities(self):
+ @mock.patch('cinder.scheduler.driver.Scheduler.'
+ 'update_service_capabilities')
+ def test_update_service_capabilities(self, _mock_update_cap):
service_name = 'fake_service'
host = 'fake_host'
-
- self.mox.StubOutWithMock(self.driver.host_manager,
- 'update_service_capabilities')
-
capabilities = {'fake_capability': 'fake_value'}
- self.driver.host_manager.update_service_capabilities(service_name,
- host,
- capabilities)
- self.mox.ReplayAll()
- result = self.driver.update_service_capabilities(service_name,
- host,
- capabilities)
-
- def test_hosts_up(self):
- service1 = {'host': 'host1'}
- service2 = {'host': 'host2'}
+ self.driver.update_service_capabilities(service_name, host,
+ capabilities)
+ _mock_update_cap.assert_called_once_with(service_name, host,
+ capabilities)
+
+ @mock.patch('cinder.db.service_get_all_by_topic')
+ @mock.patch('cinder.utils.service_is_up')
+ def test_hosts_up(self, _mock_serv_is_up, _mock_serv_get_all_by_topic):
+ service1 = {'host': 'host1', 'disabled': False}
+ service2 = {'host': 'host2', 'disabled': False}
services = [service1, service2]
- self.mox.StubOutWithMock(db, 'service_get_all_by_topic')
- self.mox.StubOutWithMock(utils, 'service_is_up')
-
- db.service_get_all_by_topic(self.context,
- self.topic).AndReturn(services)
- utils.service_is_up(service1).AndReturn(False)
- utils.service_is_up(service2).AndReturn(True)
+ def fake_serv_is_up(service):
+ return service['host'] is 'host2'
- self.mox.ReplayAll()
+ _mock_serv_get_all_by_topic.return_value = services
+ _mock_serv_is_up.side_effect = fake_serv_is_up
result = self.driver.hosts_up(self.context, self.topic)
self.assertEqual(result, ['host2'])
+ _mock_serv_get_all_by_topic.assert_called_once_with(self.context,
+ self.topic)
class SchedulerDriverBaseTestCase(SchedulerTestCase):
super(SchedulerDriverModuleTestCase, self).setUp()
self.context = context.RequestContext('fake_user', 'fake_project')
- def test_volume_host_update_db(self):
- self.mox.StubOutWithMock(timeutils, 'utcnow')
- self.mox.StubOutWithMock(db, 'volume_update')
-
- timeutils.utcnow().AndReturn('fake-now')
- db.volume_update(self.context, 31337,
- {'host': 'fake_host',
- 'scheduled_at': 'fake-now'})
-
- self.mox.ReplayAll()
+ @mock.patch('cinder.db.volume_update')
+ @mock.patch('cinder.openstack.common.timeutils.utcnow')
+ def test_volume_host_update_db(self, _mock_utcnow, _mock_vol_update):
+ _mock_utcnow.return_value = 'fake-now'
driver.volume_update_db(self.context, 31337, 'fake_host')
+ _mock_vol_update.assert_called_once_with(self.context, 31337,
+ {'host': 'fake_host',
+ 'scheduled_at': 'fake-now'})