_view_builder_class = views_types.ViewBuilder
- def _notify_volume_type_error(self, context, method, payload):
+ def _notify_volume_type_error(self, context, method, err,
+ volume_type=None, id=None, name=None):
+ payload = dict(
+ volume_types=volume_type, name=name, id=id, error_message=err)
rpc.get_notifier('volumeType').error(context, method, payload)
+ def _notify_volume_type_info(self, context, method, volume_type):
+ payload = dict(volume_types=volume_type)
+ rpc.get_notifier('volumeType').info(context, method, payload)
+
@wsgi.action("create")
@wsgi.serializers(xml=types.VolumeTypeTemplate)
def _create(self, req, body):
vol_type = body['volume_type']
name = vol_type.get('name', None)
+ description = vol_type.get('description')
specs = vol_type.get('extra_specs', {})
is_public = vol_type.get('os-volume-type-access:is_public', True)
- if name is None or name == "":
- raise webob.exc.HTTPBadRequest()
+ if name is None or len(name.strip()) == 0:
+ msg = _("Volume type name can not be empty.")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
try:
- volume_types.create(context, name, specs, is_public)
+ volume_types.create(context,
+ name,
+ specs,
+ is_public,
+ description=description)
vol_type = volume_types.get_volume_type_by_name(context, name)
req.cache_resource(vol_type, name='types')
- notifier_info = dict(volume_types=vol_type)
- rpc.get_notifier('volumeType').info(context, 'volume_type.create',
- notifier_info)
+ self._notify_volume_type_info(
+ context, 'volume_type.create', vol_type)
except exception.VolumeTypeExists as err:
- notifier_err = dict(volume_types=vol_type, error_message=err)
- self._notify_volume_type_error(context,
- 'volume_type.create',
- notifier_err)
-
+ self._notify_volume_type_error(
+ context, 'volume_type.create', err, volume_type=vol_type)
raise webob.exc.HTTPConflict(explanation=six.text_type(err))
except exception.NotFound as err:
- notifier_err = dict(volume_types=vol_type, error_message=err)
- self._notify_volume_type_error(context,
- 'volume_type.create',
- notifier_err)
+ self._notify_volume_type_error(
+ context, 'volume_type.create', err, name=name)
raise webob.exc.HTTPNotFound()
return self._view_builder.show(req, vol_type)
+ @wsgi.action("update")
+ @wsgi.serializers(xml=types.VolumeTypeTemplate)
+ def _update(self, req, id, body):
+ # Update description for a given volume type.
+ context = req.environ['cinder.context']
+ authorize(context)
+
+ if not self.is_valid_body(body, 'volume_type'):
+ raise webob.exc.HTTPBadRequest()
+
+ vol_type = body['volume_type']
+ description = vol_type.get('description', None)
+
+ if description is None:
+ msg = _("Specify the description to update.")
+ raise webob.exc.HTTPBadRequest(explanation=msg)
+
+ try:
+ # check it exists
+ vol_type = volume_types.get_volume_type(context, id)
+ volume_types.update(context, id, description)
+ # get the updated
+ vol_type = volume_types.get_volume_type(context, id)
+ req.cache_resource(vol_type, name='types')
+ self._notify_volume_type_info(
+ context, 'volume_type.update', vol_type)
+
+ except exception.VolumeTypeNotFound as err:
+ self._notify_volume_type_error(
+ context, 'volume_type.update', err, id=id)
+ raise webob.exc.HTTPNotFound(explanation=six.text_type(err))
+ except exception.VolumeTypeExists as err:
+ self._notify_volume_type_error(
+ context, 'volume_type.update', err, volume_type=vol_type)
+ raise webob.exc.HTTPConflict(explanation=six.text_type(err))
+ except exception.VolumeTypeUpdateFailed as err:
+ self._notify_volume_type_error(
+ context, 'volume_type.update', err, volume_type=vol_type)
+ raise webob.exc.HTTPInternalServerError(
+ explanation=six.text_type(err))
+
+ return self._view_builder.show(req, vol_type)
+
@wsgi.action("delete")
def _delete(self, req, id):
"""Deletes an existing volume type."""
try:
vol_type = volume_types.get_volume_type(context, id)
volume_types.destroy(context, vol_type['id'])
- notifier_info = dict(volume_types=vol_type)
- rpc.get_notifier('volumeType').info(context,
- 'volume_type.delete',
- notifier_info)
+ self._notify_volume_type_info(
+ context, 'volume_type.delete', vol_type)
except exception.VolumeTypeInUse as err:
- notifier_err = dict(id=id, error_message=err)
- self._notify_volume_type_error(context,
- 'volume_type.delete',
- notifier_err)
+ self._notify_volume_type_error(
+ context, 'volume_type.delete', err, volume_type=vol_type)
msg = _('Target volume type is still in use.')
raise webob.exc.HTTPBadRequest(explanation=msg)
except exception.NotFound as err:
- notifier_err = dict(id=id, error_message=err)
- self._notify_volume_type_error(context,
- 'volume_type.delete',
- notifier_err)
-
+ self._notify_volume_type_error(
+ context, 'volume_type.delete', err, id=id)
raise webob.exc.HTTPNotFound()
return webob.Response(status_int=202)
raise webob.exc.HTTPBadRequest(explanation=msg)
def _extend_vol_type(self, vol_type_rval, vol_type_ref):
- key = "%s:is_public" % (Volume_type_access.alias)
- vol_type_rval[key] = vol_type_ref['is_public']
+ if vol_type_ref:
+ key = "%s:is_public" % (Volume_type_access.alias)
+ vol_type_rval[key] = vol_type_ref.get('is_public', True)
@wsgi.extends
def show(self, req, resp_obj, id):
from lxml import etree
from oslo.serialization import jsonutils
+from oslo.utils import excutils
import six
import webob
meth = getattr(self, action)
else:
meth = getattr(self.controller, action)
- except AttributeError:
- if (not self.wsgi_actions or
- action not in ['action', 'create', 'delete']):
- # Propagate the error
- raise
+ except AttributeError as e:
+ with excutils.save_and_reraise_exception(e) as ctxt:
+ if (not self.wsgi_actions or action not in ['action',
+ 'create',
+ 'delete',
+ 'update']):
+ LOG.exception(six.text_type(e))
+ else:
+ ctxt.reraise = False
else:
return meth, self.wsgi_extensions.get(action, [])
def make_voltype(elem):
elem.set('id')
elem.set('name')
+ elem.set('description')
extra_specs = xmlutil.make_flat_dict('extra_specs', selector='extra_specs')
elem.append(extra_specs)
"""Return a single volume type item."""
context = req.environ['cinder.context']
- try:
- vol_type = volume_types.get_volume_type(context, id)
+ # get default volume type
+ if id is not None and id == 'default':
+ vol_type = volume_types.get_default_volume_type()
+ if not vol_type:
+ msg = _("Default volume type can not be found.")
+ raise exc.HTTPNotFound(explanation=msg)
req.cache_resource(vol_type, name='types')
- except exception.NotFound:
- msg = _("Volume type not found")
- raise exc.HTTPNotFound(explanation=msg)
+ else:
+ try:
+ vol_type = volume_types.get_volume_type(context, id)
+ req.cache_resource(vol_type, name='types')
+ except exception.NotFound:
+ msg = _("Volume type not found")
+ raise exc.HTTPNotFound(explanation=msg)
return self._view_builder.show(req, vol_type)
"""Trim away extraneous volume type attributes."""
trimmed = dict(id=volume_type.get('id'),
name=volume_type.get('name'),
- extra_specs=volume_type.get('extra_specs'))
+ extra_specs=volume_type.get('extra_specs'),
+ description=volume_type.get('description'))
return trimmed if brief else dict(volume_type=trimmed)
def index(self, request, volume_types):
return IMPL.volume_type_create(context, values, projects)
+def volume_type_update(context, volume_type_id, values):
+ return IMPL.volume_type_update(context, volume_type_id, values)
+
+
def volume_type_get_all(context, inactive=False, filters=None):
"""Get all volume types.
return query
+@require_admin_context
+def volume_type_update(context, volume_type_id, values):
+ session = get_session()
+ with session.begin():
+ volume_type_ref = _volume_type_ref_get(context,
+ volume_type_id,
+ session)
+
+ if not volume_type_ref:
+ raise exception.VolumeTypeNotFound(type_id=volume_type_id)
+
+ volume_type_ref.update(values)
+ volume_type_ref.save(session=session)
+ volume_type = volume_type_get(context, volume_type_id)
+
+ return volume_type
+
+
@require_context
def volume_type_get_all(context, inactive=False, filters=None):
"""Returns a dict describing all volume_types with name as key."""
expected_fields=expected_fields)
+@require_context
+def _volume_type_ref_get(context, id, session=None, inactive=False):
+ read_deleted = "yes" if inactive else "no"
+ result = model_query(context,
+ models.VolumeTypes,
+ session=session,
+ read_deleted=read_deleted).\
+ options(joinedload('extra_specs')).\
+ filter_by(id=id).\
+ first()
+
+ if not result:
+ raise exception.VolumeTypeNotFound(volume_type_id=id)
+
+ return result
+
+
@require_context
def _volume_type_get_by_name(context, name, session=None):
result = model_query(context, models.VolumeTypes, session=session).\
--- /dev/null
+CREATE TABLE volume_types_v33 (
+ created_at DATETIME,
+ updated_at DATETIME,
+ deleted_at DATETIME,
+ deleted BOOLEAN,
+ id VARCHAR(36) NOT NULL,
+ name VARCHAR(255),
+ is_public BOOLEAN,
+ qos_specs_id VARCHAR(36),
+ PRIMARY KEY (id)
+);
+
+INSERT INTO volume_types_v33
+ SELECT created_at,
+ updated_at,
+ deleted_at,
+ deleted,
+ id,
+ name,
+ is_public,
+ qos_specs_id
+ FROM volume_types;
+
+DROP TABLE volume_types;
+ALTER TABLE volume_types_v33 RENAME TO volume_types;
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from sqlalchemy import Column, MetaData, Table, String
+
+
+def upgrade(migrate_engine):
+ """Add description column to volume_types."""
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ volume_types = Table('volume_types', meta, autoload=True)
+ description = Column('description', String(255))
+ volume_types.create_column(description)
+ volume_types.update().values(description=None).execute()
+
+
+def downgrade(migrate_engine):
+ """Remove description column to volumes."""
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ volume_types = Table('volume_types', meta, autoload=True)
+ description = volume_types.columns.description
+ volume_types.drop_column(description)
__tablename__ = "volume_types"
id = Column(String(36), primary_key=True)
name = Column(String(255))
+ description = Column(String(255))
# A reference to qos_specs entity
qos_specs_id = Column(String(36),
ForeignKey('quality_of_service_specs.id'))
"name %(name)s and specs %(extra_specs)s")
+class VolumeTypeUpdateFailed(CinderException):
+ message = _("Cannot update volume_type %(id)s")
+
+
class UnknownCmd(VolumeDriverException):
message = _("Unknown or unsupported command %(cmd)s")
# License for the specific language governing permissions and limitations
# under the License.
+import six
import webob
from cinder.api.contrib import types_manage
"key3": "value3",
"key4": "value4",
"key5": "value5"}
- return dict(id=id, name='vol_type_%s' % str(id), extra_specs=specs)
+ return dict(id=id,
+ name='vol_type_%s' % six.text_type(id),
+ description='vol_type_desc_%s' % six.text_type(id),
+ extra_specs=specs)
+
+
+def stub_volume_type_updated(id):
+ return dict(id=id,
+ name='vol_type_%s_%s' % (six.text_type(id), six.text_type(id)),
+ description='vol_type_desc_%s_%s' % (
+ six.text_type(id), six.text_type(id)))
+
+
+def stub_volume_type_updated_desc_only(id):
+ return dict(id=id,
+ name='vol_type_%s' % six.text_type(id),
+ description='vol_type_desc_%s_%s' % (
+ six.text_type(id), six.text_type(id)))
def return_volume_types_get_volume_type(context, id):
pass
-def return_volume_types_create(context, name, specs, is_public):
+def return_volume_types_create(context,
+ name,
+ specs,
+ is_public,
+ description):
pass
-def return_volume_types_create_duplicate_type(context, name, specs, is_public):
+def return_volume_types_create_duplicate_type(context,
+ name,
+ specs,
+ is_public,
+ description):
raise exception.VolumeTypeExists(id=name)
+def return_volume_types_update(context, id, description):
+ pass
+
+
+def return_volume_types_update_fail(context, id, description):
+ raise exception.VolumeTypeUpdateFailed(id=id)
+
+
+def return_volume_types_get_volume_type_updated(context, id):
+ if id == "777":
+ raise exception.VolumeTypeNotFound(volume_type_id=id)
+ if id == '888':
+ return stub_volume_type_updated_desc_only(int(id))
+
+ # anything else
+ return stub_volume_type_updated(int(id))
+
+
def return_volume_types_get_by_name(context, name):
if name == "777":
raise exception.VolumeTypeNotFoundByName(volume_type_name=name)
return stub_volume_type(int(name.split("_")[2]))
+def return_volume_types_get_default():
+ return stub_volume_type(1)
+
+
+def return_volume_types_get_default_not_found():
+ return {}
+
+
class VolumeTypesManageApiTest(test.TestCase):
def setUp(self):
super(VolumeTypesManageApiTest, self).setUp()
return_volume_types_destroy)
req = fakes.HTTPRequest.blank('/v2/fake/types/1')
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
self.controller._delete(req, 1)
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(1, len(fake_notifier.NOTIFICATIONS))
def test_volume_types_delete_not_found(self):
self.stubs.Set(volume_types, 'get_volume_type',
self.stubs.Set(volume_types, 'destroy',
return_volume_types_destroy)
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
req = fakes.HTTPRequest.blank('/v2/fake/types/777')
self.assertRaises(webob.exc.HTTPNotFound, self.controller._delete,
req, '777')
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(1, len(fake_notifier.NOTIFICATIONS))
def test_volume_types_with_volumes_destroy(self):
self.stubs.Set(volume_types, 'get_volume_type',
self.stubs.Set(volume_types, 'destroy',
return_volume_types_with_volumes_destroy)
req = fakes.HTTPRequest.blank('/v2/fake/types/1')
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
self.controller._delete(req, 1)
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 1)
+ self.assertEqual(1, len(fake_notifier.NOTIFICATIONS))
def test_create(self):
self.stubs.Set(volume_types, 'create',
"extra_specs": {"key1": "value1"}}}
req = fakes.HTTPRequest.blank('/v2/fake/types')
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
res_dict = self.controller._create(req, body)
- self.assertEqual(len(fake_notifier.NOTIFICATIONS), 1)
- self.assertEqual(1, len(res_dict))
- self.assertEqual('vol_type_1', res_dict['volume_type']['name'])
+ self.assertEqual(1, len(fake_notifier.NOTIFICATIONS))
+ self._check_test_results(res_dict, {
+ 'expected_name': 'vol_type_1', 'expected_desc': 'vol_type_desc_1'})
def test_create_duplicate_type_fail(self):
self.stubs.Set(volume_types, 'create',
def test_create_malformed_entity(self):
body = {'volume_type': 'string'}
self._create_volume_type_bad_body(body=body)
+
+ def test_update(self):
+ self.stubs.Set(volume_types, 'update',
+ return_volume_types_update)
+ self.stubs.Set(volume_types, 'get_volume_type',
+ return_volume_types_get_volume_type_updated)
+
+ body = {"volume_type": {"description": "vol_type_desc_1_1"}}
+ req = fakes.HTTPRequest.blank('/v2/fake/types/1')
+ req.method = 'PUT'
+
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
+ res_dict = self.controller._update(req, '1', body)
+ self.assertEqual(1, len(fake_notifier.NOTIFICATIONS))
+ self._check_test_results(res_dict,
+ {'expected_desc': 'vol_type_desc_1_1'})
+
+ def test_update_non_exist(self):
+ self.stubs.Set(volume_types, 'update',
+ return_volume_types_update)
+ self.stubs.Set(volume_types, 'get_volume_type',
+ return_volume_types_get_volume_type)
+
+ body = {"volume_type": {"description": "vol_type_desc_1_1"}}
+ req = fakes.HTTPRequest.blank('/v2/fake/types/777')
+ req.method = 'PUT'
+
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller._update, req, '777', body)
+ self.assertEqual(1, len(fake_notifier.NOTIFICATIONS))
+
+ def test_update_db_fail(self):
+ self.stubs.Set(volume_types, 'update',
+ return_volume_types_update_fail)
+ self.stubs.Set(volume_types, 'get_volume_type',
+ return_volume_types_get_volume_type)
+
+ body = {"volume_type": {"description": "vol_type_desc_1_1"}}
+ req = fakes.HTTPRequest.blank('/v2/fake/types/1')
+ req.method = 'PUT'
+
+ self.assertEqual(0, len(fake_notifier.NOTIFICATIONS))
+ self.assertRaises(webob.exc.HTTPInternalServerError,
+ self.controller._update, req, '1', body)
+ self.assertEqual(1, len(fake_notifier.NOTIFICATIONS))
+
+ def test_update_no_description(self):
+ body = {"volume_type": {}}
+ req = fakes.HTTPRequest.blank('/v2/fake/types/1')
+ req.method = 'PUT'
+
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.controller._update, req, '1', body)
+
+ def _check_test_results(self, results, expected_results):
+ self.assertEqual(1, len(results))
+ self.assertEqual(expected_results['expected_desc'],
+ results['volume_type']['description'])
+ if expected_results.get('expected_name'):
+ self.assertEqual(expected_results['expected_name'],
+ results['volume_type']['name'])
\ No newline at end of file
self.type_action_controller.show(self.req, resp, '0')
self.assertEqual({'id': '0', 'os-volume-type-access:is_public': True},
resp.obj['volume_type'])
- self.type_action_controller.show(self.req, resp, '2')
- self.assertEqual({'id': '0', 'os-volume-type-access:is_public': False},
- resp.obj['volume_type'])
def test_detail(self):
resp = FakeResponse()
updated_at=now,
extra_specs={},
deleted_at=None,
+ description=None,
id=42)
request = fakes.HTTPRequest.blank("/v1")
self.assertIn('volume_type', output)
expected_volume_type = dict(name='new_type',
extra_specs={},
+ description=None,
id=42)
self.assertDictMatch(output['volume_type'], expected_volume_type)
updated_at=now,
extra_specs={},
deleted_at=None,
+ description=None,
id=42 + i))
request = fakes.HTTPRequest.blank("/v1")
for i in range(0, 10):
expected_volume_type = dict(name='new_type',
extra_specs={},
- id=42 + i)
+ id=42 + i,
+ description=None)
self.assertDictMatch(output['volume_types'][i],
expected_volume_type)
from lxml import etree
from oslo.utils import timeutils
+import six
import webob
from cinder.api.v2 import types
}
return dict(
id=id,
- name='vol_type_%s' % str(id),
+ name='vol_type_%s' % six.text_type(id),
+ description='vol_type_desc_%s' % six.text_type(id),
extra_specs=specs,
)
return stub_volume_type(int(name.split("_")[2]))
+def return_volume_types_get_default():
+ return stub_volume_type(1)
+
+
+def return_volume_types_get_default_not_found():
+ return {}
+
+
class VolumeTypesApiTest(test.TestCase):
def setUp(self):
super(VolumeTypesApiTest, self).setUp()
self.assertRaises(webob.exc.HTTPNotFound, self.controller.show,
req, '777')
+ def test_get_default(self):
+ self.stubs.Set(volume_types, 'get_default_volume_type',
+ return_volume_types_get_default)
+ req = fakes.HTTPRequest.blank('/v2/fake/types/default')
+ req.method = 'GET'
+ res_dict = self.controller.show(req, 'default')
+ self.assertEqual(1, len(res_dict))
+ self.assertEqual('vol_type_1', res_dict['volume_type']['name'])
+ self.assertEqual('vol_type_desc_1',
+ res_dict['volume_type']['description'])
+
+ def test_get_default_not_found(self):
+ self.stubs.Set(volume_types, 'get_default_volume_type',
+ return_volume_types_get_default_not_found)
+ req = fakes.HTTPRequest.blank('/v2/fake/types/default')
+ req.method = 'GET'
+
+ self.assertRaises(webob.exc.HTTPNotFound,
+ self.controller.show, req, 'default')
+
def test_view_builder_show(self):
view_builder = views_types.ViewBuilder()
now = timeutils.isotime()
raw_volume_type = dict(
name='new_type',
+ description='new_type_desc',
deleted=False,
created_at=now,
updated_at=now,
self.assertIn('volume_type', output)
expected_volume_type = dict(
name='new_type',
+ description='new_type_desc',
extra_specs={},
id=42,
)
raw_volume_types.append(
dict(
name='new_type',
+ description='new_type_desc',
deleted=False,
created_at=now,
updated_at=now,
for i in range(0, 10):
expected_volume_type = dict(
name='new_type',
+ description='new_type_desc',
extra_specs={},
id=42 + i
)
def _verify_volume_type(self, vtype, tree):
self.assertEqual('volume_type', tree.tag)
self.assertEqual(vtype['name'], tree.get('name'))
+ self.assertEqual(vtype['description'], tree.get('description'))
self.assertEqual(str(vtype['id']), tree.get('id'))
self.assertEqual(1, len(tree))
extra_specs = tree[0]
encryptions = db_utils.get_table(engine, 'encryption')
self.assertNotIn('encryption_id', encryptions.c)
+ def _check_034(self, engine, data):
+ """Test adding description columns to volume_types table."""
+ volume_types = db_utils.get_table(engine, 'volume_types')
+ self.assertIsInstance(volume_types.c.description.type,
+ sqlalchemy.types.VARCHAR)
+
+ def _post_downgrade_034(self, engine):
+ volume_types = db_utils.get_table(engine, 'volume_types')
+ self.assertNotIn('description', volume_types.c)
+
def test_walk_versions(self):
self.walk_versions(True, False)
size="300",
rpm="7200",
visible="True")
+ self.vol_type1_description = self.vol_type1_name + '_desc'
def test_volume_type_create_then_destroy(self):
"""Ensure volume types can be created and deleted."""
prev_all_vtypes = volume_types.get_all_types(self.ctxt)
+ # create
type_ref = volume_types.create(self.ctxt,
self.vol_type1_name,
- self.vol_type1_specs)
+ self.vol_type1_specs,
+ description=self.vol_type1_description)
new = volume_types.get_volume_type_by_name(self.ctxt,
self.vol_type1_name)
LOG.info(_("Given data: %s"), self.vol_type1_specs)
LOG.info(_("Result data: %s"), new)
+ self.assertEqual(self.vol_type1_description, new['description'])
+
for k, v in self.vol_type1_specs.iteritems():
self.assertEqual(v, new['extra_specs'][k],
'one of fields does not match')
len(new_all_vtypes),
'drive type was not created')
+ # update
+ new_type_desc = self.vol_type1_description + '_updated'
+ type_ref_updated = volume_types.update(self.ctxt,
+ type_ref.id,
+ new_type_desc)
+ self.assertEqual(new_type_desc, type_ref_updated['description'])
+
+ # destroy
volume_types.destroy(self.ctxt, type_ref['id'])
new_all_vtypes = volume_types.get_all_types(self.ctxt)
self.assertEqual(prev_all_vtypes,
from oslo.config import cfg
from oslo.db import exception as db_exc
+import six
from cinder import context
from cinder import db
LOG = logging.getLogger(__name__)
-def create(context, name, extra_specs=None, is_public=True, projects=None):
+def create(context,
+ name,
+ extra_specs=None,
+ is_public=True,
+ projects=None,
+ description=None):
"""Creates volume types."""
extra_specs = extra_specs or {}
projects = projects or []
type_ref = db.volume_type_create(context,
dict(name=name,
extra_specs=extra_specs,
- is_public=is_public),
+ is_public=is_public,
+ description=description),
projects=projects)
except db_exc.DBError as e:
- LOG.exception(_LE('DB error: %s') % e)
+ LOG.exception(_LE('DB error: %s') % six.text_type(e))
raise exception.VolumeTypeCreateFailed(name=name,
extra_specs=extra_specs)
return type_ref
+def update(context, id, description):
+ """Update volume type by id."""
+ if id is None:
+ msg = _("id cannot be None")
+ raise exception.InvalidVolumeType(reason=msg)
+ try:
+ type_updated = db.volume_type_update(context,
+ id,
+ dict(description=description))
+ except db_exc.DBError as e:
+ LOG.exception(_LE('DB error: %s') % six.text_type(e))
+ raise exception.VolumeTypeUpdateFailed(id=id)
+ return type_updated
+
+
def destroy(context, id):
"""Marks volume types as deleted."""
if id is None:
# Couldn't find volume type with the name in default_volume_type
# flag, record this issue and move on
#TODO(zhiteng) consider add notification to warn admin
- LOG.exception(_LE('Default volume type is not found, '
- 'please check default_volume_type '
- 'config: %s'), e)
+ LOG.exception(_LE('Default volume type is not found,'
+ 'please check default_volume_type config: %s') %
+ six.text_type(e))
return vol_type