self.context,
volume_id)
+ def test_run_attach_twice_multiattach_volume_for_instances(self):
+ """Make sure volume can be attached to multiple instances."""
+ mountpoint = "/dev/sdf"
+ # attach volume to the instance then to detach
+ instance_uuid = '12345678-1234-5678-1234-567812345699'
+ volume = tests_utils.create_volume(self.context,
+ admin_metadata={'readonly': 'True'},
+ multiattach=True,
+ **self.volume_params)
+ volume_id = volume['id']
+ self.volume.create_volume(self.context, volume_id)
+ attachment = self.volume.attach_volume(self.context, volume_id,
+ instance_uuid, None,
+ mountpoint, 'ro')
+ vol = db.volume_get(context.get_admin_context(), volume_id)
+ self.assertEqual('in-use', vol['status'])
+ self.assertTrue(vol['multiattach'])
+ self.assertEqual('attached', attachment['attach_status'])
+ self.assertEqual(mountpoint, attachment['mountpoint'])
+ self.assertEqual(instance_uuid, attachment['instance_uuid'])
+ self.assertIsNone(attachment['attached_host'])
+ admin_metadata = vol['volume_admin_metadata']
+ self.assertEqual(2, len(admin_metadata))
+ expected = dict(readonly='True', attached_mode='ro')
+ ret = {}
+ for item in admin_metadata:
+ ret.update({item['key']: item['value']})
+ self.assertDictMatch(expected, ret)
+ connector = {'initiator': 'iqn.2012-07.org.fake:01'}
+ conn_info = self.volume.initialize_connection(self.context,
+ volume_id, connector)
+ self.assertEqual('ro', conn_info['data']['access_mode'])
+
+ mountpoint2 = "/dev/sdx"
+ attachment2 = self.volume.attach_volume(self.context, volume_id,
+ instance_uuid, None,
+ mountpoint2, 'ro')
+ vol = db.volume_get(context.get_admin_context(), volume_id)
+ self.assertEqual('in-use', vol['status'])
+ self.assertEqual(True, vol['multiattach'])
+ self.assertIsNone(attachment2)
+
+ self.assertRaises(exception.VolumeAttached,
+ self.volume.delete_volume,
+ self.context,
+ volume_id)
+
def test_attach_detach_not_multiattach_volume_for_instances(self):
"""Make sure volume can't be attached to more than one instance."""
mountpoint = "/dev/sdf"
self.context,
volume_id)
+ def test_run_attach_twice_multiattach_volume_for_hosts(self):
+ """Make sure volume can be attached and detached from hosts."""
+ mountpoint = "/dev/sdf"
+ volume = tests_utils.create_volume(
+ self.context,
+ admin_metadata={'readonly': 'False'},
+ multiattach=True,
+ **self.volume_params)
+ volume_id = volume['id']
+ self.volume.create_volume(self.context, volume_id)
+ attachment = self.volume.attach_volume(self.context, volume_id, None,
+ 'fake_host', mountpoint, 'rw')
+ vol = db.volume_get(context.get_admin_context(), volume_id)
+ self.assertEqual('in-use', vol['status'])
+ self.assertTrue(vol['multiattach'])
+ self.assertEqual('attached', attachment['attach_status'])
+ self.assertEqual(mountpoint, attachment['mountpoint'])
+ self.assertIsNone(attachment['instance_uuid'])
+ # sanitized, conforms to RFC-952 and RFC-1123 specs.
+ self.assertEqual('fake-host', attachment['attached_host'])
+ admin_metadata = vol['volume_admin_metadata']
+ self.assertEqual(2, len(admin_metadata))
+ expected = dict(readonly='False', attached_mode='rw')
+ ret = {}
+ for item in admin_metadata:
+ ret.update({item['key']: item['value']})
+ self.assertDictMatch(expected, ret)
+ connector = {'initiator': 'iqn.2012-07.org.fake:01'}
+ conn_info = self.volume.initialize_connection(self.context,
+ volume_id, connector)
+ self.assertEqual('rw', conn_info['data']['access_mode'])
+
+ mountpoint2 = "/dev/sdx"
+ attachment2 = self.volume.attach_volume(self.context, volume_id, None,
+ 'fake_host', mountpoint2,
+ 'rw')
+ vol = db.volume_get(context.get_admin_context(), volume_id)
+ self.assertEqual('in-use', vol['status'])
+ self.assertIsNone(attachment2)
+
+ self.assertRaises(exception.VolumeAttached,
+ self.volume.delete_volume,
+ self.context,
+ volume_id)
+
def test_run_attach_detach_not_multiattach_volume_for_hosts(self):
"""Make sure volume can't be attached to more than one host."""
mountpoint = "/dev/sdf"