try:
kwargs['volume_type'] = volume_types.get_volume_type_by_name(
context, req_volume_type)
- except exception.NotFound:
- raise exc.HTTPNotFound()
+ except exception.VolumeTypeNotFound:
+ explanation = 'Volume type not found.'
+ raise exc.HTTPNotFound(explanation=explanation)
kwargs['metadata'] = volume.get('metadata', None)
cfg.ListOpt('memcached_servers',
default=None,
help='Memcached servers or None for in process cache.'),
+ cfg.StrOpt('default_volume_type',
+ default=None,
+ help='default volume type to use'),
cfg.StrOpt('volume_usage_audit_period',
default='month',
help='time period to generate volume usages for. '
flags.DECLARE('volume_driver', 'cinder.volume.manager')
flags.DECLARE('xiv_proxy', 'cinder.volume.xiv')
+def_vol_type = 'fake_vol_type'
+
def set_defaults(conf):
+ conf.set_default('default_volume_type', def_vol_type)
conf.set_default('volume_driver', 'cinder.volume.driver.FakeISCSIDriver')
conf.set_default('connection_type', 'fake')
conf.set_default('fake_rabbit', True)
from cinder import quota
from cinder import test
from cinder.volume import iscsi
+from cinder.tests import fake_flags
QUOTAS = quota.QUOTAS
FLAGS = flags.FLAGS
self.context,
volume_id)
+ def test_create_volume_with_volume_type(self):
+ """Test volume creation with default volume type."""
+ def fake_reserve(context, expire=None, **deltas):
+ return ["RESERVATION"]
+
+ def fake_commit(context, reservations):
+ pass
+
+ def fake_rollback(context, reservations):
+ pass
+
+ self.stubs.Set(QUOTAS, "reserve", fake_reserve)
+ self.stubs.Set(QUOTAS, "commit", fake_commit)
+ self.stubs.Set(QUOTAS, "rollback", fake_rollback)
+
+ volume_api = cinder.volume.api.API()
+
+ # Create volume with default volume type while default
+ # volume type doesn't exist, volume_type_id should be NULL
+ volume = volume_api.create(self.context,
+ 1,
+ 'name',
+ 'description')
+ self.assertEquals(volume['volume_type_id'], None)
+
+ # Create default volume type
+ vol_type = fake_flags.def_vol_type
+ db.volume_type_create(context.get_admin_context(),
+ dict(name=vol_type, extra_specs={}))
+
+ db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
+ vol_type)
+
+ # Create volume with default volume type
+ volume = volume_api.create(self.context,
+ 1,
+ 'name',
+ 'description')
+ self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+
+ # Create volume with specific volume type
+ vol_type = 'test'
+ db.volume_type_create(context.get_admin_context(),
+ dict(name=vol_type, extra_specs={}))
+ db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
+ vol_type)
+
+ volume = volume_api.create(self.context,
+ 1,
+ 'name',
+ 'description',
+ volume_type=db_vol_type)
+ self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+
def test_delete_busy_volume(self):
"""Test volume survives deletion if driver reports it as busy."""
volume = self._create_volume()
from cinder.volume import volume_types
from cinder.db.sqlalchemy import session as sql_session
from cinder.db.sqlalchemy import models
+from cinder.tests import fake_flags
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
vol_types = volume_types.get_all_types(self.ctxt)
self.assertEqual(total_volume_types, len(vol_types))
+ def test_get_default_volume_type(self):
+ """ Ensures default volume type can be retrieved """
+ volume_types.create(self.ctxt,
+ fake_flags.def_vol_type,
+ {})
+ default_vol_type = volume_types.get_default_volume_type()
+ self.assertEqual(default_vol_type.get('name'),
+ fake_flags.def_vol_type)
+
+ def test_default_volume_type_missing_in_db(self):
+ """ Ensures proper exception raised if default volume type
+ is not in database. """
+ session = sql_session.get_session()
+ default_vol_type = volume_types.get_default_volume_type()
+ self.assertEqual(default_vol_type, {})
+
def test_non_existent_vol_type_shouldnt_delete(self):
"""Ensures that volume type creation fails with invalid args"""
self.assertRaises(exception.VolumeTypeNotFoundByName,
from cinder.openstack.common import log as logging
from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
+from cinder.volume import volume_types
import cinder.policy
from cinder import quota
if availability_zone is None:
availability_zone = FLAGS.storage_availability_zone
- if volume_type is None:
- volume_type_id = None
- else:
- volume_type_id = volume_type.get('id', None)
+ if not volume_type:
+ volume_type = volume_types.get_default_volume_type()
+
+ volume_type_id = volume_type.get('id')
options = {
'size': size,
return db.volume_type_get_by_name(context, name)
+def get_default_volume_type():
+ """Get the default volume type."""
+ name = FLAGS.default_volume_type
+ vol_type = {}
+
+ if name is not None:
+ ctxt = context.get_admin_context()
+ try:
+ vol_type = get_volume_type_by_name(ctxt, name)
+ except exception.VolumeTypeNotFoundByName, e:
+ # Couldn't find volume type with the name in default_volume_type
+ # flag, record this issue and move on
+ #TODO(zhiteng) consider add notification to warn admin
+ LOG.exception(_('Default volume type is not found, '
+ 'please check default_volume_type config: %s'), e)
+
+ return vol_type
+
+
def is_key_value_present(volume_type_id, key, value, volume_type=None):
if volume_type_id is None:
return False