# clean up
self.volume.delete_volume(self.context, volume['id'])
+ def test_create_volume_from_unelevated_context(self):
+ """Test context does't change after volume creation failure."""
+ def fake_create_volume(context, volume_ref, snapshot_ref,
+ sourcevol_ref, image_service, image_id,
+ image_location):
+ raise exception.CinderException('fake exception')
+
+ def fake_reschedule_or_reraise(context, volume_id, exc_info,
+ snapshot_id, image_id, request_spec,
+ filter_properties, allow_reschedule):
+ self.assertFalse(context.is_admin)
+ self.assertFalse('admin' in context.roles)
+ #compare context passed in with the context we saved
+ self.assertDictMatch(self.saved_ctxt.__dict__,
+ context.__dict__)
+
+ #create context for testing
+ ctxt = self.context.deepcopy()
+ if 'admin' in ctxt.roles:
+ ctxt.roles.remove('admin')
+ ctxt.is_admin = False
+ #create one copy of context for future comparison
+ self.saved_ctxt = ctxt.deepcopy()
+
+ self.stubs.Set(self.volume, '_reschedule_or_reraise',
+ fake_reschedule_or_reraise)
+ self.stubs.Set(self.volume, '_create_volume',
+ fake_create_volume)
+
+ volume_src = self._create_volume()
+ self.volume.create_volume(ctxt, volume_src['id'])
+
def test_create_volume_from_sourcevol(self):
"""Test volume can be created from a source volume."""
def fake_create_cloned_volume(volume, src_vref):
filter_properties=None, allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None):
"""Creates and exports the volume."""
+ context_before_elevated = context.deepcopy()
context = context.elevated()
if filter_properties is None:
filter_properties = {}
{'status': sourcevol_ref['status']})
exc_info = sys.exc_info()
# try to re-schedule volume:
- self._reschedule_or_reraise(context, volume_id, exc_info,
+ self._reschedule_or_reraise(context_before_elevated,
+ volume_id, exc_info,
snapshot_id, image_id,
request_spec, filter_properties,
allow_reschedule)