from webob import exc
from cinder.api.contrib import admin_actions
-from cinder.brick.local_dev import lvm as brick_lvm
from cinder import context
from cinder import db
from cinder import exception
from cinder.tests.unit import cast_as_call
from cinder.tests.unit import fake_snapshot
from cinder.volume import api as volume_api
-from cinder.volume.targets import tgt
CONF = cfg.CONF
self.volume_api = volume_api.API()
cast_as_call.mock_cast_as_call(self.volume_api.volume_rpcapi.client)
cast_as_call.mock_cast_as_call(self.volume_api.scheduler_rpcapi.client)
- self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
- self.stubs.Set(tgt.TgtAdm,
- 'create_iscsi_target',
- self._fake_create_iscsi_target)
-
- def _fake_create_iscsi_target(self, name, tid, lun,
- path, chap_auth=None, **kwargs):
- return 1
def _issue_volume_reset(self, ctx, volume, updated_status):
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
self.volume_api.reserve_volume(ctx, volume)
mountpoint = '/dev/vbd'
attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID,
self.assertEqual(1, len(admin_metadata))
self.assertEqual('readonly', admin_metadata[0]['key'], 'readonly')
self.assertEqual('False', admin_metadata[0]['value'])
- # cleanup
- svc.stop()
def test_force_detach_host_attached_volume(self):
# admin context
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
self.volume_api.initialize_connection(ctx, volume, connector)
mountpoint = '/dev/vbd'
host_name = 'fake-host'
self.assertEqual(1, len(admin_metadata))
self.assertEqual('readonly', admin_metadata[0]['key'])
self.assertEqual('False', admin_metadata[0]['value'])
- # cleanup
- svc.stop()
def test_volume_force_detach_raises_remote_error(self):
# admin context
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
self.volume_api.reserve_volume(ctx, volume)
mountpoint = '/dev/vbd'
attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID,
self.assertRaises(messaging.RemoteError,
req.get_response,
app())
- # cleanup
- svc.stop()
def test_volume_force_detach_raises_db_error(self):
# In case of DB error 500 error code is returned to user
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
self.volume_api.reserve_volume(ctx, volume)
mountpoint = '/dev/vbd'
attachment = self.volume_api.attach(ctx, volume, stubs.FAKE_UUID,
self.assertRaises(messaging.RemoteError,
req.get_response,
app())
- # cleanup
- svc.stop()
def test_attach_in_used_volume_by_instance(self):
"""Test that attaching to an in-use volume fails."""
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
self.volume_api.reserve_volume(ctx, volume)
conn_info = self.volume_api.initialize_connection(ctx,
volume, connector)
None,
'/dev/vdb1',
'ro')
- # cleanup
- svc.stop()
def test_attach_in_used_volume_by_host(self):
"""Test that attaching to an in-use volume fails."""
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
self.volume_api.reserve_volume(ctx, volume)
self.volume_api.initialize_connection(ctx, volume, connector)
self.volume_api.attach(ctx, volume, None, 'fake_host1',
'fake_host2',
'/dev/vbd1',
'ro')
- # cleanup
- svc.stop()
def test_invalid_iscsi_connector(self):
"""Test connector without the initiator (required by iscsi driver)."""
connector = {}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
self.assertRaises(exception.InvalidInput,
self.volume_api.initialize_connection,
ctx, volume, connector)
- # cleanup
- svc.stop()
def test_attach_attaching_volume_with_different_instance(self):
"""Test that attaching volume reserved for another instance fails."""
'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
self.volume_api.reserve_volume(ctx, volume)
values = {'volume_id': volume['id'],
'attach_status': 'attaching',
self.assertEqual(stubs.FAKE_UUID, attachment['instance_uuid'])
self.assertEqual(volume['id'], attachment['volume_id'], volume['id'])
self.assertEqual('attached', attachment['attach_status'])
- svc.stop()
def test_attach_attaching_volume_with_different_mode(self):
"""Test that attaching volume reserved for another mode fails."""
'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
+ self.addCleanup(svc.stop)
values = {'status': 'attaching',
'instance_uuid': fakes.get_fake_uuid()}
db.volume_update(ctx, volume['id'], values)
None,
mountpoint,
'ro')
- # cleanup
- svc.stop()
def _migrate_volume_prep(self):
admin_ctx = context.get_admin_context()