self.stubs = stubout.StubOutForTesting()
self.injected = []
self._services = []
+ FLAGS.set_override('fatal_exception_format_errors', True)
def tearDown(self):
"""Runs after each test method to tear down test environment."""
def test_no_instance_passthrough_404(self):
def fake_snapshot_get(*args, **kwargs):
- raise exception.InstanceNotFound()
+ raise exception.InstanceNotFound(instance_id='fake')
self.stubs.Set(volume.api.API, 'get_snapshot', fake_snapshot_get)
url = '/v2/fake/snapshots/70f6db34-de8d-4fbd-aafb-4065bdfa6115'
def test_copy_volume_to_image_invalidvolume(self):
def stub_upload_volume_to_image_service_raise(self, context, volume,
metadata, force):
- raise exception.InvalidVolume
+ raise exception.InvalidVolume(reason='blah')
self.stubs.Set(volume_api.API,
"copy_volume_to_image",
stub_upload_volume_to_image_service_raise)
self.assertEquals(unicode(exc), 'default message: 500')
def test_error_msg_exception_with_kwargs(self):
+ # NOTE(dprince): disable format errors for this test
+ self.flags(fatal_exception_format_errors=False)
+
class FakeCinderException(exception.CinderException):
message = "default message: %(mispelled_code)s"
raise exc()
def test_exceptions_raise(self):
+ # NOTE(dprince): disable format errors since we are not passing kwargs
+ self.flags(fatal_exception_format_errors=False)
for name in dir(exception):
exc = getattr(exception, name)
if isinstance(exc, type):
self.mox.StubOutWithMock(self.volume.driver, 'delete_volume')
self.volume.driver.delete_volume(
- mox.IgnoreArg()).AndRaise(exception.VolumeIsBusy)
+ mox.IgnoreArg()).AndRaise(exception.VolumeIsBusy(
+ volume_name='fake'))
self.mox.ReplayAll()
res = self.volume.delete_volume(self.context, volume_id)
self.assertEqual(True, res)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
self.volume.driver.delete_snapshot(
- mox.IgnoreArg()).AndRaise(exception.SnapshotIsBusy)
+ mox.IgnoreArg()).AndRaise(
+ exception.SnapshotIsBusy(snapshot_name='fake'))
self.mox.ReplayAll()
self.volume.delete_snapshot(self.context, snapshot_id)
snapshot_ref = db.snapshot_get(self.context, snapshot_id)
raise self.exception.NotAuthorized()
if self.xiv_info['xiv_address'] != FLAGS.san_ip:
- raise self.exception.HostNotFound()
+ raise self.exception.HostNotFound(host='fake')
def create_volume(self, volume):
if volume['size'] > 100:
- raise self.exception.VolumeBackendAPIException()
+ raise self.exception.VolumeBackendAPIException(data='blah')
self.volumes[volume['name']] = volume
def volume_exists(self, volume):
def initialize_connection(self, volume, connector):
if not self.volume_exists(volume):
- raise self.exception.VolumeNotFound()
+ raise self.exception.VolumeNotFound(volume_id=volume['id'])
lun_id = volume['id']
self.volumes[volume['name']]['attached'] = connector
def terminate_connection(self, volume, connector):
if not self.volume_exists(volume):
- raise self.exception.VolumeNotFound()
+ raise self.exception.VolumeNotFound(volume_id=volume['id'])
if not self.is_volume_attached(volume, connector):
- raise self.exception.VolumeNotFoundForInstance()
+ raise self.exception.VolumeNotFoundForInstance(instance_id='fake')
del self.volumes[volume['name']]['attached']
def is_volume_attached(self, volume, connector):
if not self.volume_exists(volume):
- raise self.exception.VolumeNotFound()
+ raise self.exception.VolumeNotFound(volume_id=volume['id'])
return (self.volumes[volume['name']].get('attached', None)
== connector)