--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""The volume encryption metadata extension."""
+
+import webob
+
+from cinder.api import extensions
+from cinder.api.openstack import wsgi
+from cinder.api import xmlutil
+from cinder import db
+from cinder import exception
+from cinder.openstack.common.notifier import api as notifier_api
+from cinder.volume import volume_types
+
+authorize = extensions.extension_authorizer('volume',
+ 'volume_encryption_metadata')
+
+
+class VolumeEncryptionMetadataTemplate(xmlutil.TemplateBuilder):
+ def construct(self):
+ root = xmlutil.make_flat_dict('encryption', selector='encryption')
+ return xmlutil.MasterTemplate(root, 1)
+
+
+class VolumeEncryptionMetadataController(wsgi.Controller):
+ """The volume encryption metadata API extension"""
+
+ def _get_volume_encryption_metadata(self, context, volume_id):
+ return db.volume_encryption_metadata_get(context, volume_id)
+
+ def _is_volume_type_encrypted(self, context, volume_id):
+ volume_ref = db.volume_get(context, volume_id)
+ volume_type_id = volume_ref['volume_type_id']
+ return volume_types.is_encrypted(context, volume_type_id)
+
+ def _get_metadata(self, req, volume_id):
+ context = req.environ['cinder.context']
+ authorize(context)
+ if self._is_volume_type_encrypted(context, volume_id):
+ return self._get_volume_encryption_metadata(context, volume_id)
+ else:
+ return {
+ 'encryption_key_id': None,
+ # Additional metadata defaults could go here.
+ }
+
+ @wsgi.serializers(xml=VolumeEncryptionMetadataTemplate)
+ def index(self, req, volume_id):
+ """Returns the encryption metadata for a given volume."""
+ return self._get_metadata(req, volume_id)
+
+ @wsgi.serializers(xml=VolumeEncryptionMetadataTemplate)
+ def show(self, req, volume_id, id):
+ """Return a single encryption item."""
+ encryption_item = self.index(req, volume_id)
+ if encryption_item is not None:
+ return encryption_item[id]
+ else:
+ return None
+
+
+class Volume_encryption_metadata(extensions.ExtensionDescriptor):
+ """Volume encryption metadata retrieval support."""
+
+ name = "VolumeEncryptionMetadata"
+ alias = "os-volume-encryption-metadata"
+ namespace = ("http://docs.openstack.org/volume/ext/"
+ "os-volume-encryption-metadata/api/v1")
+ updated = "2013-07-10T00:00:00+00:00"
+
+ def get_resources(self):
+ resources = []
+ res = extensions.ResourceExtension(
+ 'encryption', VolumeEncryptionMetadataController(),
+ parent=dict(member_name='volume', collection_name='volumes'))
+ resources.append(res)
+ return resources
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""The volume types encryption extension."""
+
+import webob
+
+from cinder.api import extensions
+from cinder.api.openstack import wsgi
+from cinder.api import xmlutil
+from cinder import db
+from cinder import exception
+from cinder.openstack.common.notifier import api as notifier_api
+from cinder.volume import volume_types
+
+authorize = extensions.extension_authorizer('volume',
+ 'volume_type_encryption')
+
+CONTROL_LOCATION = ['front-end', 'back-end']
+
+
+class VolumeTypeEncryptionTemplate(xmlutil.TemplateBuilder):
+ def construct(self):
+ root = xmlutil.make_flat_dict('encryption', selector='encryption')
+ return xmlutil.MasterTemplate(root, 1)
+
+
+class VolumeTypeEncryptionController(wsgi.Controller):
+ """The volume type encryption API controller for the OpenStack API """
+
+ def _get_volume_type_encryption(self, context, type_id):
+ encryption_ref = db.volume_type_encryption_get(context, type_id)
+ encryption_specs = {}
+ if not encryption_ref:
+ return encryption_specs
+ for key, value in encryption_ref.iteritems():
+ encryption_specs[key] = value
+ return encryption_specs
+
+ def _check_type(self, context, type_id):
+ try:
+ volume_types.get_volume_type(context, type_id)
+ except exception.NotFound as ex:
+ raise webob.exc.HTTPNotFound(explanation=unicode(ex))
+
+ def _check_encryption_input(self, encryption, create=True):
+ if 'key_size' in encryption.keys():
+ key_size = encryption['key_size']
+ if key_size is not None:
+ if isinstance(key_size, (int, long)):
+ if key_size < 0:
+ msg = _('key_size must be non-negative')
+ raise exception.InvalidInput(reason=msg)
+ else:
+ msg = _('key_size must be an integer')
+ raise exception.InvalidInput(reason=msg)
+
+ if create:
+ msg = None
+ if 'provider' not in encryption.keys():
+ msg = _('provider must be defined')
+ elif 'control_location' not in encryption.keys():
+ msg = _('control_location must be defined')
+
+ if msg is not None:
+ raise exception.InvalidInput(reason=msg)
+
+ # Check control location
+ if 'control_location' in encryption.keys():
+ if encryption['control_location'] not in CONTROL_LOCATION:
+ msg = _("Valid control location are: %s") % CONTROL_LOCATION
+ raise exception.InvalidInput(reason=msg)
+
+ @wsgi.serializers(xml=VolumeTypeEncryptionTemplate)
+ def index(self, req, type_id):
+ """Returns the encryption specs for a given volume type."""
+ context = req.environ['cinder.context']
+ authorize(context)
+ self._check_type(context, type_id)
+ return self._get_volume_type_encryption(context, type_id)
+
+ @wsgi.serializers(xml=VolumeTypeEncryptionTemplate)
+ def create(self, req, type_id, body=None):
+ """Create encryption specs for an existing volume type."""
+ context = req.environ['cinder.context']
+ authorize(context)
+
+ if not self.is_valid_body(body, 'encryption'):
+ expl = _('Create body is not valid.')
+ raise webob.exc.HTTPBadRequest(explanation=expl)
+
+ self._check_type(context, type_id)
+
+ encryption_specs = self._get_volume_type_encryption(context, type_id)
+ if encryption_specs:
+ raise exception.VolumeTypeEncryptionExists(type_id=type_id)
+
+ encryption_specs = body['encryption']
+
+ self._check_encryption_input(encryption_specs)
+
+ db.volume_type_encryption_update_or_create(context, type_id,
+ encryption_specs)
+ notifier_info = dict(type_id=type_id, specs=encryption_specs)
+ notifier_api.notify(context, 'volumeTypeEncryption',
+ 'volume_type_encryption.create',
+ notifier_api.INFO, notifier_info)
+ return body
+
+ @wsgi.serializers(xml=VolumeTypeEncryptionTemplate)
+ def show(self, req, type_id, id):
+ """Return a single encryption item."""
+ context = req.environ['cinder.context']
+ authorize(context)
+
+ self._check_type(context, type_id)
+
+ encryption_specs = self._get_volume_type_encryption(context, type_id)
+
+ if id not in encryption_specs:
+ raise webob.exc.HTTPNotFound()
+
+ return {id: encryption_specs[id]}
+
+
+class Volume_type_encryption(extensions.ExtensionDescriptor):
+ """Encryption support for volume types."""
+
+ name = "VolumeTypeEncryption"
+ alias = "encryption"
+ namespace = ("http://docs.openstack.org/volume/ext/"
+ "volume-type-encryption/api/v1")
+ updated = "2013-07-01T00:00:00+00:00"
+
+ def get_resources(self):
+ resources = []
+ res = extensions.ResourceExtension(
+ Volume_type_encryption.alias,
+ VolumeTypeEncryptionController(),
+ parent=dict(member_name='type', collection_name='types'))
+ resources.append(res)
+ return resources
+
+ def get_controller_extensions(self):
+ controller = VolumeTypeEncryptionController()
+ extension = extensions.ControllerExtension(self, 'types', controller)
+ return [extension]
###################
+def volume_type_encryption_get(context, volume_type_id, session=None):
+ return IMPL.volume_type_encryption_get(context, volume_type_id, session)
+
+
+def volume_type_encryption_delete(context, volume_type_id):
+ return IMPL.volume_type_encryption_delete(context, volume_type_id)
+
+
+# TODO(joel-coffman): split into two functions -- update and create
+def volume_type_encryption_update_or_create(context, volume_type_id,
+ encryption_specs):
+ return IMPL.volume_type_encryption_update_or_create(context,
+ volume_type_id,
+ encryption_specs)
+
+
+def volume_type_encryption_volume_get(context, volume_type_id, session=None):
+ return IMPL.volume_type_encryption_volume_get(context, volume_type_id,
+ session)
+
+
+###################
+
+
+def volume_encryption_metadata_get(context, volume_id, session=None):
+ return IMPL.volume_encryption_metadata_get(context, volume_id, session)
+
+
+###################
+
+
def volume_glance_metadata_create(context, volume_id, key, value):
"""Update the Glance metadata for the specified volume."""
return IMPL.volume_glance_metadata_create(context,
####################
+@require_context
+def volume_type_encryption_get(context, volume_type_id, session=None):
+ return model_query(context, models.Encryption, session=session,
+ read_deleted="no").\
+ filter_by(volume_type_id=volume_type_id).first()
+
+
+@require_admin_context
+def volume_type_encryption_delete(context, volume_type_id):
+ session = get_session()
+ with session.begin():
+ encryption = volume_type_encryption_get(context, volume_type_id,
+ session)
+ encryption.update({'deleted': True,
+ 'deleted_at': timeutils.utcnow(),
+ 'updated_at': literal_column('updated_at')})
+
+
+# TODO(joel-coffman): split into two functions -- update and create
+@require_admin_context
+def volume_type_encryption_update_or_create(context, volume_type_id,
+ values):
+ session = get_session()
+ encryption = volume_type_encryption_get(context, volume_type_id,
+ session)
+
+ if not encryption:
+ encryption = models.Encryption()
+
+ if 'volume_type_id' not in values:
+ values['volume_type_id'] = volume_type_id
+
+ encryption.update(values)
+ encryption.save(session=session)
+
+ return encryption
+
+
+def volume_type_encryption_volume_get(context, volume_type_id, session=None):
+ volume_list = _volume_get_query(context, session=session,
+ project_only=False).\
+ filter_by(volume_type_id=volume_type_id).\
+ all()
+ return volume_list
+
+####################
+
+
+@require_admin_context
+def volume_encryption_metadata_get(context, volume_id, session=None):
+ """Return the encryption key id for a given volume."""
+
+ volume_ref = _volume_get(context, volume_id)
+ encryption_ref = volume_type_encryption_get(context,
+ volume_ref['volume_type_id'])
+
+ return {
+ 'encryption_key_id': volume_ref['encryption_key_id'],
+ 'control_location': encryption_ref['control_location'],
+ 'cipher': encryption_ref['cipher'],
+ 'key_size': encryption_ref['key_size'],
+ 'provider': encryption_ref['provider'],
+ }
+
+
+####################
+
+
@require_context
@require_volume_exists
def _volume_glance_metadata_get(context, volume_id, session=None):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Column, ForeignKey, MetaData, Table
+from sqlalchemy import Boolean, DateTime, Integer, String
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import timeutils
+from cinder.openstack.common import uuidutils
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _populate_encryption_types(volume_types, encryption):
+ # TODO(joel-coffman): The database currently doesn't enforce uniqueness
+ # for volume type names.
+ default_encryption_types = {
+ 'dm-crypt': {
+ 'cipher': 'aes-xts-plain64',
+ 'control_location': 'front-end',
+ 'key_size': 512, # only half of key is used for cipher in XTS mode
+ 'provider':
+ 'nova.volume.encryptors.cryptsetup.CryptsetupEncryptor',
+ },
+ 'LUKS': {
+ 'cipher': 'aes-xts-plain64',
+ 'control_location': 'front-end',
+ 'key_size': 512, # only half of key is used for cipher in XTS mode
+ 'provider': 'nova.volume.encryptors.luks.LuksEncryptor',
+ },
+ }
+
+ try:
+ volume_types_insert = volume_types.insert()
+ encryption_insert = encryption.insert()
+
+ for key, values in default_encryption_types.iteritems():
+ current_time = timeutils.utcnow()
+ volume_type = {
+ 'id': uuidutils.generate_uuid(),
+ 'name': key,
+ 'created_at': current_time,
+ 'updated_at': current_time,
+ 'deleted': False,
+ }
+ volume_types_insert.execute(volume_type)
+
+ values['id'] = uuidutils.generate_uuid()
+ values['volume_type_id'] = volume_type['id']
+
+ values['created_at'] = timeutils.utcnow()
+ values['updated_at'] = values['created_at']
+ values['deleted'] = False
+
+ encryption_insert.execute(values)
+ except Exception:
+ LOG.error(_("Error populating default encryption types!"))
+ # NOTE(joel-coffman): do not raise because deployed environment may
+ # have volume types already defined with the same name
+
+
+def upgrade(migrate_engine):
+ meta = MetaData(bind=migrate_engine)
+
+ # encryption key UUID -- must be stored per volume
+ volumes = Table('volumes', meta, autoload=True)
+ encryption_key = Column('encryption_key_id', String(36))
+ try:
+ volumes.create_column(encryption_key)
+ except Exception:
+ LOG.error(_("Column |%s| not created!"), repr(encryption_key))
+ raise
+
+ # encryption key UUID and volume type id -- must be stored per snapshot
+ snapshots = Table('snapshots', meta, autoload=True)
+ encryption_key = Column('encryption_key_id', String(36))
+ try:
+ snapshots.create_column(encryption_key)
+ except Exception:
+ LOG.error(_("Column |%s| not created!"), repr(encryption_key))
+ raise
+ volume_type = Column('volume_type_id', String(36))
+ try:
+ snapshots.create_column(volume_type)
+ except Exception:
+ LOG.error(_("Column |%s| not created!"), repr(volume_type))
+ raise
+
+ volume_types = Table('volume_types', meta, autoload=True)
+
+ # encryption types associated with particular volume type
+ encryption = Table(
+ 'encryption', meta,
+ Column('created_at', DateTime(timezone=False)),
+ Column('updated_at', DateTime(timezone=False)),
+ Column('deleted_at', DateTime(timezone=False)),
+ Column('deleted', Boolean(create_constraint=True, name=None)),
+ Column('cipher', String(length=255)),
+ Column('control_location', String(length=255), nullable=False),
+ Column('key_size', Integer),
+ Column('provider', String(length=255), nullable=False),
+ # NOTE(joel-coffman): The volume_type_id must be unique or else the
+ # referenced volume type becomes ambiguous. That is, specifying the
+ # volume type is not sufficient to identify a particular encryption
+ # scheme unless each volume type is associated with at most one
+ # encryption scheme.
+ Column('volume_type_id', String(length=36),
+ ForeignKey(volume_types.c.id),
+ primary_key=True, nullable=False),
+ mysql_engine='InnoDB'
+ )
+
+ try:
+ encryption.create()
+ except Exception:
+ LOG.error(_("Table |%s| not created!"), repr(encryption))
+ raise
+
+ _populate_encryption_types(volume_types, encryption)
+
+
+def downgrade(migrate_engine):
+ meta = MetaData(bind=migrate_engine)
+
+ # drop encryption key UUID for volumes
+ volumes = Table('volumes', meta, autoload=True)
+ try:
+ volumes.c.encryption_key_id.drop()
+ except Exception:
+ LOG.error(_("encryption_key_id column not dropped from volumes"))
+ raise
+
+ # drop encryption key UUID and volume type id for snapshots
+ snapshots = Table('snapshots', meta, autoload=True)
+ try:
+ snapshots.c.encryption_key_id.drop()
+ except Exception:
+ LOG.error(_("encryption_key_id column not dropped from snapshots"))
+ raise
+ try:
+ snapshots.c.volume_type_id.drop()
+ except Exception:
+ LOG.error(_("volume_type_id column not dropped from snapshots"))
+ raise
+
+ # drop encryption types table
+ encryption = Table('encryption', meta, autoload=True)
+ try:
+ encryption.drop()
+ except Exception:
+ LOG.error(_("encryption table not dropped"))
+ raise
+
+ # TODO(joel-coffman): Should remove volume_types related to encryption...
volume_type_id = Column(String(36))
source_volid = Column(String(36))
+ encryption_key_id = Column(String(36))
+
deleted = Column(Boolean, default=False)
bootable = Column(Boolean, default=False)
display_name = Column(String(255))
display_description = Column(String(255))
+ encryption_key_id = Column(String(36))
+ volume_type_id = Column(String(36))
+
provider_location = Column(String(255))
volume = relationship(Volume, backref="snapshots",
object_count = Column(Integer)
+class Encryption(BASE, CinderBase):
+ """Represents encryption requirement for a volume type.
+
+ Encryption here is a set of performance characteristics describing
+ cipher, provider, and key_size for a certain volume type.
+ """
+
+ __tablename__ = 'encryption'
+ cipher = Column(String(255))
+ key_size = Column(Integer)
+ provider = Column(String(255))
+ control_location = Column(String(255))
+ volume_type_id = Column(String(36),
+ ForeignKey('volume_types.id'),
+ primary_key=True)
+ volume_type = relationship(
+ VolumeTypes,
+ backref="encryption",
+ foreign_keys=volume_type_id,
+ primaryjoin='and_('
+ 'Encryption.volume_type_id == VolumeTypes.id,'
+ 'Encryption.deleted == False)'
+ )
+
+
class Transfer(BASE, CinderBase):
"""Represents a volume transfer request."""
__tablename__ = 'transfers'
message = _("Volume Type %(id)s already exists.")
+class VolumeTypeEncryptionExists(Invalid):
+ message = _("Volume type encryption for type %(type_id)s already exists.")
+
+
class MigrationError(CinderException):
message = _("Migration error") + ": %(reason)s"
keymgr_opts = [
cfg.StrOpt('keymgr_api_class',
- default='cinder.keymgr.key_mgr.KeyManager',
+ default='cinder.keymgr.'
+ 'not_implemented_key_mgr.NotImplementedKeyManager',
help='The full class name of the key manager API class'),
]
"""
pass
+ @abc.abstractmethod
+ def copy_key(self, ctxt, key_id, **kwargs):
+ """Copies (i.e., clones) a key stored by the key manager.
+
+ This method copies the specified key and returns the copy's UUID. If
+ the specified context does not permit copying keys, then a
+ NotAuthorized error should be raised.
+
+ Implementation note: This method should behave identically to
+ store_key(context, get_key(context, <encryption key UUID>))
+ although it is preferable to perform this operation within the key
+ manager to avoid unnecessary handling of the key material.
+ """
+ pass
+
@abc.abstractmethod
def get_key(self, ctxt, key_id, **kwargs):
"""Retrieves the specified key.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Key manager implementation that raises NotImplementedError
+"""
+
+from cinder.keymgr import key_mgr
+
+
+class NotImplementedKeyManager(key_mgr.KeyManager):
+ """Key Manager Interface that raises NotImplementedError for all operations
+ """
+
+ def create_key(self, ctxt, algorithm='AES', length=256, expiration=None,
+ **kwargs):
+ raise NotImplementedError()
+
+ def store_key(self, ctxt, key, expiration=None, **kwargs):
+ raise NotImplementedError()
+
+ def copy_key(self, ctxt, key_id, **kwargs):
+ raise NotImplementedError()
+
+ def get_key(self, ctxt, key_id, **kwargs):
+ raise NotImplementedError()
+
+ def delete_key(self, ctxt, key_id, **kwargs):
+ raise NotImplementedError()
from cinder.api.contrib import quotas
from cinder import context
+from cinder import db
from cinder import test
resources = {'gigabytes': gigabytes,
'snapshots': snapshots,
'volumes': volumes}
+ # need to consider preexisting volume types as well
+ volume_types = db.volume_type_get_all(context.get_admin_context())
+ for volume_type in volume_types:
+ resources['gigabytes_' + volume_type] = -1
+ resources['snapshots_' + volume_type] = -1
+ resources['volumes_' + volume_type] = -1
+
if tenant_id:
resources['id'] = tenant_id
if root:
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import json
+import webob
+
+from cinder.api.contrib import volume_encryption_metadata
+from cinder import context
+from cinder import db
+from cinder import test
+from cinder.tests.api import fakes
+from cinder.volume import volume_types
+
+
+def return_volume_type_encryption_metadata(context, volume_type_id):
+ return stub_volume_type_encryption()
+
+
+def stub_volume_type_encryption():
+ values = {
+ 'cipher': 'cipher',
+ 'key_size': 256,
+ 'provider': 'nova.volume.encryptors.base.VolumeEncryptor',
+ 'volume_type_id': 'volume_type',
+ 'control_location': 'front-end',
+ }
+ return values
+
+
+class VolumeEncryptionMetadataTest(test.TestCase):
+ @staticmethod
+ def _create_volume(context,
+ display_name='test_volume',
+ display_description='this is a test volume',
+ status='creating',
+ availability_zone='fake_az',
+ host='fake_host',
+ size=1):
+ """Create a volume object."""
+ volume = {
+ 'size': size,
+ 'user_id': 'fake',
+ 'project_id': 'fake',
+ 'status': status,
+ 'display_name': display_name,
+ 'display_description': display_description,
+ 'attach_status': 'detached',
+ 'availability_zone': availability_zone,
+ 'host': host,
+ 'encryption_key_id': 'fake_key',
+ }
+ return db.volume_create(context, volume)['id']
+
+ def setUp(self):
+ super(VolumeEncryptionMetadataTest, self).setUp()
+ self.controller = (volume_encryption_metadata.
+ VolumeEncryptionMetadataController())
+ self.stubs.Set(db.sqlalchemy.api, 'volume_type_encryption_get',
+ return_volume_type_encryption_metadata)
+
+ self.ctxt = context.RequestContext('fake', 'fake', is_admin=True)
+ self.volume_id = self._create_volume(self.ctxt)
+
+ def tearDown(self):
+ db.volume_destroy(self.ctxt, self.volume_id)
+ super(VolumeEncryptionMetadataTest, self).tearDown()
+
+ def test_index(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption'
+ % self.volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(200, res.status_code)
+ res_dict = json.loads(res.body)
+
+ expected = {
+ "encryption_key_id": "fake_key",
+ "control_location": "front-end",
+ "cipher": "cipher",
+ "provider": "nova.volume.encryptors.base.VolumeEncryptor",
+ "key_size": 256,
+ }
+ self.assertEqual(expected, res_dict)
+
+ def test_index_bad_tenant_id(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ req = webob.Request.blank('/v2/%s/volumes/%s/encryption'
+ % ('bad-tenant-id', self.volume_id))
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(400, res.status_code)
+
+ res_dict = json.loads(res.body)
+ expected = {'badRequest': {'code': 400,
+ 'message': 'Malformed request url'}}
+ self.assertEqual(expected, res_dict)
+
+ def test_index_bad_volume_id(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ bad_volume_id = 'bad_volume_id'
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption'
+ % bad_volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(404, res.status_code)
+
+ res_dict = json.loads(res.body)
+ expected = {'itemNotFound': {'code': 404,
+ 'message': 'VolumeNotFound: Volume '
+ '%s could not be found.'
+ % bad_volume_id}}
+ self.assertEqual(expected, res_dict)
+
+ def test_show_key(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption/'
+ 'encryption_key_id' % self.volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(200, res.status_code)
+
+ self.assertEqual('fake_key', res.body)
+
+ def test_show_control(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption/'
+ 'control_location' % self.volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(200, res.status_code)
+
+ self.assertEqual('front-end', res.body)
+
+ def test_show_provider(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption/'
+ 'provider' % self.volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(200, res.status_code)
+
+ self.assertEqual('nova.volume.encryptors.base.VolumeEncryptor',
+ res.body)
+
+ def test_show_bad_tenant_id(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ req = webob.Request.blank('/v2/%s/volumes/%s/encryption/'
+ 'encryption_key_id' % ('bad-tenant-id',
+ self.volume_id))
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(400, res.status_code)
+
+ res_dict = json.loads(res.body)
+ expected = {'badRequest': {'code': 400,
+ 'message': 'Malformed request url'}}
+ self.assertEqual(expected, res_dict)
+
+ def test_show_bad_volume_id(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ bad_volume_id = 'bad_volume_id'
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption/'
+ 'encryption_key_id' % bad_volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(404, res.status_code)
+
+ res_dict = json.loads(res.body)
+ expected = {'itemNotFound': {'code': 404,
+ 'message': 'VolumeNotFound: Volume '
+ '%s could not be found.'
+ % bad_volume_id}}
+ self.assertEqual(expected, res_dict)
+
+ def test_retrieve_key_not_admin(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: True)
+
+ ctxt = self.ctxt.deepcopy()
+ ctxt.is_admin = False
+
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption/'
+ 'encryption_key_id' % self.volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=ctxt))
+ self.assertEqual(403, res.status_code)
+ res_dict = json.loads(res.body)
+
+ expected = {
+ 'forbidden': {
+ 'code': 403,
+ 'message': ("Policy doesn't allow volume_extension:"
+ "volume_encryption_metadata to be performed.")
+ }
+ }
+ self.assertEqual(expected, res_dict)
+
+ def test_show_volume_not_encrypted_type(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: False)
+
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption/'
+ 'encryption_key_id' % self.volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+ self.assertEqual(200, res.status_code)
+ self.assertEqual(0, len(res.body))
+
+ def test_index_volume_not_encrypted_type(self):
+ self.stubs.Set(volume_types, 'is_encrypted', lambda *a, **kw: False)
+
+ req = webob.Request.blank('/v2/fake/volumes/%s/encryption'
+ % self.volume_id)
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=self.ctxt))
+
+ self.assertEqual(200, res.status_code)
+ res_dict = json.loads(res.body)
+
+ expected = {
+ 'encryption_key_id': None
+ }
+ self.assertEqual(expected, res_dict)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import json
+import webob
+from xml.dom import minidom
+
+from cinder.api.contrib import volume_type_encryption
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder.openstack.common.notifier import api as notifier_api
+from cinder.openstack.common.notifier import test_notifier
+from cinder import test
+from cinder.tests.api import fakes
+from cinder.volume import volume_types
+
+
+def return_volume_type_encryption_db(context, volume_type_id, session):
+ return stub_volume_type_encryption()
+
+
+def return_volume_type_encryption(context, volume_type_id):
+ return stub_volume_type_encryption()
+
+
+def stub_volume_type_encryption():
+ values = {
+ 'cipher': 'fake_cipher',
+ 'control_location': 'front-end',
+ 'key_size': 256,
+ 'provider': 'fake_provider',
+ 'volume_type_id': 'fake_type_id',
+ }
+ return values
+
+
+def volume_type_encryption_get(context, volume_type_id):
+ pass
+
+
+class VolumeTypeEncryptionTest(test.TestCase):
+
+ def setUp(self):
+ super(VolumeTypeEncryptionTest, self).setUp()
+ self.flags(connection_type='fake',
+ host='fake',
+ notification_driver=[test_notifier.__name__])
+ self.api_path = '/v2/fake/os-volume-types/1/encryption'
+ """to reset notifier drivers left over from other api/contrib tests"""
+ notifier_api._reset_drivers()
+ test_notifier.NOTIFICATIONS = []
+
+ def tearDown(self):
+ notifier_api._reset_drivers()
+ super(VolumeTypeEncryptionTest, self).tearDown()
+
+ def _get_response(self, volume_type, admin=True,
+ url='/v2/fake/types/%s/encryption',
+ req_method='GET', req_body=None,
+ req_headers=None):
+ ctxt = context.RequestContext('fake', 'fake', is_admin=admin)
+
+ req = webob.Request.blank(url % volume_type['id'])
+ req.method = req_method
+ req.body = req_body
+ if req_headers:
+ req.headers['Content-Type'] = req_headers
+
+ return req.get_response(fakes.wsgi_app(fake_auth_context=ctxt))
+
+ def test_index(self):
+ self.stubs.Set(db, 'volume_type_encryption_get',
+ return_volume_type_encryption)
+
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+
+ res = self._get_response(volume_type)
+ self.assertEqual(200, res.status_code)
+ res_dict = json.loads(res.body)
+
+ expected = stub_volume_type_encryption()
+ self.assertEqual(expected, res_dict)
+
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
+ def test_index_invalid_type(self):
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+
+ res = self._get_response(volume_type)
+ self.assertEqual(404, res.status_code)
+ res_dict = json.loads(res.body)
+
+ expected = {
+ 'itemNotFound': {
+ 'code': 404,
+ 'message': ('Volume type %s could not be found.'
+ % volume_type['id'])
+ }
+ }
+ self.assertEqual(expected, res_dict)
+
+ def test_show_key_size(self):
+ self.stubs.Set(db, 'volume_type_encryption_get',
+ return_volume_type_encryption)
+
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+
+ res = self._get_response(volume_type,
+ url='/v2/fake/types/%s/encryption/key_size')
+ res_dict = json.loads(res.body)
+
+ self.assertEqual(200, res.status_code)
+ self.assertEqual(256, res_dict['key_size'])
+
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
+ def test_show_provider(self):
+ self.stubs.Set(db, 'volume_type_encryption_get',
+ return_volume_type_encryption)
+
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+
+ res = self._get_response(volume_type,
+ url='/v2/fake/types/%s/encryption/provider')
+ res_dict = json.loads(res.body)
+
+ self.assertEqual(200, res.status_code)
+ self.assertEqual('fake_provider', res_dict['provider'])
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
+ def test_show_item_not_found(self):
+ self.stubs.Set(db, 'volume_type_encryption_get',
+ return_volume_type_encryption)
+
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+
+ res = self._get_response(volume_type,
+ url='/v2/fake/types/%s/encryption/fake')
+ res_dict = json.loads(res.body)
+
+ self.assertEqual(404, res.status_code)
+ expected = {
+ 'itemNotFound': {
+ 'code': 404,
+ 'message': ('The resource could not be found.')
+ }
+ }
+ self.assertEqual(expected, res_dict)
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
+ def _create(self, cipher, control_location, key_size, provider):
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+
+ body = {"encryption": {'cipher': cipher,
+ 'control_location': control_location,
+ 'key_size': key_size,
+ 'provider': provider,
+ 'volume_type_id': volume_type['id']}}
+
+ self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ res = self._get_response(volume_type)
+ res_dict = json.loads(res.body)
+ self.assertEqual(200, res.status_code)
+ # Confirm that volume type has no encryption information
+ # before create.
+ self.assertEqual('{}', res.body)
+
+ # Create encryption specs for the volume type
+ # with the defined body.
+ res = self._get_response(volume_type, req_method='POST',
+ req_body=json.dumps(body),
+ req_headers='application/json')
+ res_dict = json.loads(res.body)
+
+ self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
+
+ # check response
+ self.assertTrue('encryption' in res_dict)
+ self.assertEqual(cipher, res_dict['encryption']['cipher'])
+ self.assertEqual(control_location,
+ res_dict['encryption']['control_location'])
+ self.assertEqual(key_size, res_dict['encryption']['key_size'])
+ self.assertEqual(provider, res_dict['encryption']['provider'])
+ self.assertEqual(volume_type['id'],
+ res_dict['encryption']['volume_type_id'])
+
+ # check database
+ encryption = db.volume_type_encryption_get(context.get_admin_context(),
+ volume_type['id'])
+ self.assertIsNotNone(encryption)
+ self.assertEqual(cipher, encryption['cipher'])
+ self.assertEqual(key_size, encryption['key_size'])
+ self.assertEqual(provider, encryption['provider'])
+ self.assertEqual(volume_type['id'], encryption['volume_type_id'])
+
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
+ def test_create_json(self):
+ self._create('fake_cipher', 'front-end', 128, 'fake_encryptor')
+
+ def test_create_xml(self):
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+
+ ctxt = context.RequestContext('fake', 'fake', is_admin=True)
+
+ req = webob.Request.blank('/v2/fake/types/%s/encryption'
+ % volume_type['id'])
+ req.method = 'POST'
+ req.body = ('<encryption provider="test_provider" '
+ 'cipher="cipher" control_location="front-end" />')
+ req.headers['Content-Type'] = 'application/xml'
+ req.headers['Accept'] = 'application/xml'
+ res = req.get_response(fakes.wsgi_app(fake_auth_context=ctxt))
+
+ self.assertEqual(res.status_int, 200)
+
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
+ def test_create_invalid_volume_type(self):
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+
+ body = {"encryption": {'cipher': 'cipher',
+ 'control_location': 'front-end',
+ 'key_size': 128,
+ 'provider': 'fake_provider',
+ 'volume_type_id': 'volume_type'}}
+
+ res = self._get_response(volume_type, req_method='POST',
+ req_body=json.dumps(body),
+ req_headers='application/json')
+ res_dict = json.loads(res.body)
+
+ self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
+ self.assertEqual(404, res.status_code)
+
+ expected = {
+ 'itemNotFound': {
+ 'code': 404,
+ 'message': ('Volume type %s could not be found.'
+ % volume_type['id'])
+ }
+ }
+ self.assertEqual(expected, res_dict)
+
+ def test_create_encryption_type_exists(self):
+ self.stubs.Set(db, 'volume_type_encryption_get',
+ return_volume_type_encryption)
+
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+
+ body = {"encryption": {'cipher': 'cipher',
+ 'control_location': 'front-end',
+ 'key_size': 128,
+ 'provider': 'fake_provider',
+ 'volume_type_id': volume_type['id']}}
+
+ # Try to create encryption specs for a volume type
+ # that already has them.
+ res = self._get_response(volume_type, req_method='POST',
+ req_body=json.dumps(body),
+ req_headers='application/json')
+ res_dict = json.loads(res.body)
+
+ expected = {
+ 'badRequest': {
+ 'code': 400,
+ 'message': ('Volume type encryption for type '
+ 'fake_type_id already exists.')
+ }
+ }
+ self.assertEqual(expected, res_dict)
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
+ def _encryption_create_bad_body(self, body,
+ msg='Create body is not valid.'):
+ volume_type = {
+ 'id': 'fake_type_id',
+ 'name': 'fake_type',
+ }
+ db.volume_type_create(context.get_admin_context(), volume_type)
+ res = self._get_response(volume_type, req_method='POST',
+ req_body=json.dumps(body),
+ req_headers='application/json')
+
+ res_dict = json.loads(res.body)
+
+ expected = {
+ 'badRequest': {
+ 'code': 400,
+ 'message': (msg)
+ }
+ }
+ self.assertEqual(expected, res_dict)
+ db.volume_type_destroy(context.get_admin_context(), volume_type['id'])
+
+ def test_create_no_body(self):
+ self._encryption_create_bad_body(body=None)
+
+ def test_create_malformed_entity(self):
+ body = {'encryption': 'string'}
+ self._encryption_create_bad_body(body=body)
+
+ def test_create_negative_key_size(self):
+ body = {"encryption": {'cipher': 'cipher',
+ 'key_size': -128,
+ 'provider': 'fake_provider',
+ 'volume_type_id': 'volume_type'}}
+ msg = 'Invalid input received: key_size must be non-negative'
+ self._encryption_create_bad_body(body=body, msg=msg)
+
+ def test_create_none_key_size(self):
+ self._create('fake_cipher', 'front-end', None, 'fake_encryptor')
+
+ def test_create_invalid_control_location(self):
+ body = {"encryption": {'cipher': 'cipher',
+ 'control_location': 'fake_control',
+ 'provider': 'fake_provider',
+ 'volume_type_id': 'volume_type'}}
+ msg = ("Invalid input received: Valid control location are: "
+ "['front-end', 'back-end']")
+ self._encryption_create_bad_body(body=body, msg=msg)
+
+ def test_create_no_provider(self):
+ body = {"encryption": {'cipher': 'cipher',
+ 'volume_type_id': 'volume_type'}}
+ msg = ("Invalid input received: provider must be defined")
+ self._encryption_create_bad_body(body=body, msg=msg)
'name': 'fake',
'host': 'fake-host',
'status': 'available',
+ 'encryption_key_id': None,
+ 'volume_type_id': None,
'metadata': {}}
'name': 'fake',
'host': 'fake-host',
'status': 'available',
+ 'encryption_key_id': None,
+ 'volume_type_id': None,
'metadata': {}}
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Justin Santa Barbara
+# Copyright 2012 OpenStack LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Implementation of a fake key manager."""
+
+
+from cinder.tests.keymgr import mock_key_mgr
+
+
+def fake_api():
+ return mock_key_mgr.MockKeyManager()
return self.store_key(ctxt, _key)
+ def _generate_key_id(self):
+ key_id = uuidutils.generate_uuid()
+ while key_id in self.keys:
+ key_id = uuidutils.generate_uuid()
+
+ return key_id
+
def store_key(self, ctxt, key, **kwargs):
"""Stores (i.e., registers) a key with the key manager.
"""
if ctxt is None:
raise exception.NotAuthorized()
- # generate UUID and ensure that it isn't in use
- key_id = uuidutils.generate_uuid()
- while key_id in self.keys:
- key_id = uuidutils.generate_uuid()
-
+ key_id = self._generate_key_id()
self.keys[key_id] = key
return key_id
+ def copy_key(self, ctxt, key_id, **kwargs):
+ if ctxt is None:
+ raise exception.NotAuthorized()
+
+ copied_key_id = self._generate_key_id()
+ self.keys[copied_key_id] = self.keys[key_id]
+
+ return copied_key_id
+
def get_key(self, ctxt, key_id, **kwargs):
"""Retrieves the key identified by the specified id.
self.assertRaises(exception.NotAuthorized,
self.key_mgr.store_key, None, None)
+ def test_copy_key(self):
+ key_id = self.key_mgr.create_key(self.ctxt)
+ key = self.key_mgr.get_key(self.ctxt, key_id)
+
+ copied_key_id = self.key_mgr.copy_key(self.ctxt, key_id)
+ copied_key = self.key_mgr.get_key(self.ctxt, copied_key_id)
+
+ self.assertNotEqual(key_id, copied_key_id)
+ self.assertEqual(key, copied_key)
+
+ def test_copy_null_context(self):
+ self.assertRaises(exception.NotAuthorized,
+ self.key_mgr.copy_key, None, None)
+
def test_get_key(self):
pass
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Test cases for the not implemented key manager.
+"""
+
+from cinder.keymgr import not_implemented_key_mgr
+from cinder.tests.keymgr import test_key_mgr
+
+
+class NotImplementedKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
+
+ def _create_key_manager(self):
+ return not_implemented_key_mgr.NotImplementedKeyManager()
+
+ def setUp(self):
+ super(NotImplementedKeyManagerTestCase, self).setUp()
+
+ def test_create_key(self):
+ self.assertRaises(NotImplementedError,
+ self.key_mgr.create_key, None)
+
+ def test_store_key(self):
+ self.assertRaises(NotImplementedError,
+ self.key_mgr.store_key, None, None)
+
+ def test_copy_key(self):
+ self.assertRaises(NotImplementedError,
+ self.key_mgr.copy_key, None, None)
+
+ def test_get_key(self):
+ self.assertRaises(NotImplementedError,
+ self.key_mgr.get_key, None, None)
+
+ def test_delete_key(self):
+ self.assertRaises(NotImplementedError,
+ self.key_mgr.delete_key, None, None)
"volume_extension:volume_actions:upload_image": [],
"volume_extension:types_manage": [],
"volume_extension:types_extra_specs": [],
+ "volume_extension:volume_type_encryption": [["rule:admin_api"]],
+ "volume_extension:volume_encryption_metadata": [["rule:admin_api"]],
"volume_extension:extended_snapshot_attributes": [],
"volume_extension:volume_image_metadata": [],
"volume_extension:volume_host_attribute": [["rule:admin_api"]],
self.assertEquals(should_be, db.snapshot_metadata_get(self.ctxt, 1))
+class DBAPIEncryptionTestCase(BaseTest):
+
+ """Tests for the db.api.volume_type_encryption_* methods."""
+
+ _ignored_keys = [
+ 'deleted',
+ 'deleted_at',
+ 'created_at',
+ 'updated_at',
+ ]
+
+ def setUp(self):
+ super(DBAPIEncryptionTestCase, self).setUp()
+ self.created = \
+ [db.volume_type_encryption_update_or_create(self.ctxt, 'fake_type',
+ values)
+ for values in self._get_values()]
+
+ def _get_values(self, one=False):
+ base_values = {
+ 'cipher': 'fake_cipher',
+ 'key_size': 256,
+ 'provider': 'fake_provider',
+ 'volume_type_id': 'fake_type',
+ 'control_location': 'front-end',
+ }
+ if one:
+ return base_values
+
+ def compose(val, step):
+ if isinstance(val, str):
+ step = str(step)
+ return val + step
+
+ return [dict([(k, compose(v, i)) for k, v in base_values.items()])
+ for i in range(1, 4)]
+
+ def test_volume_type_encryption_update_or_create(self):
+ values = self._get_values()
+ for i, encryption in enumerate(self.created):
+ print "values[%s] = %s" % (i, values[i])
+ print "encryption = %s" % encryption.__dict__
+ self._assertEqualObjects(values[i], encryption,
+ self._ignored_keys)
+
+ def test_volume_type_encryption_get(self):
+ for encryption in self.created:
+ print "encryption = %s" % encryption.__dict__
+ encryption_get = \
+ db.volume_type_encryption_get(self.ctxt,
+ encryption['volume_type_id'])
+ print "encryption_get = %s" % encryption_get.__dict__
+ self._assertEqualObjects(encryption, encryption_get,
+ self._ignored_keys)
+
+ def test_volume_type_encryption_delete(self):
+ values = {
+ 'cipher': 'fake_cipher',
+ 'key_size': 256,
+ 'provider': 'fake_provider',
+ 'volume_type_id': 'fake_type',
+ 'control_location': 'front-end',
+ }
+
+ encryption = db.volume_type_encryption_update_or_create(self.ctxt,
+ 'fake_type',
+ values)
+ self._assertEqualObjects(values, encryption, self._ignored_keys)
+
+ db.volume_type_encryption_delete(self.ctxt,
+ encryption['volume_type_id'])
+ encryption_get = \
+ db.volume_type_encryption_get(self.ctxt,
+ encryption['volume_type_id'])
+ self.assertEqual(None, encryption_get)
+
+
class DBAPIReservationTestCase(BaseTest):
"""Tests for db.api.reservation_* methods."""
'sm_backend_config'))
self.assertTrue(engine.dialect.has_table(engine.connect(),
'sm_volume'))
+
+ def test_migration_017(self):
+ """Test that added encryption information works correctly."""
+ for (key, engine) in self.engines.items():
+ migration_api.version_control(engine,
+ TestMigrations.REPOSITORY,
+ migration.INIT_VERSION)
+
+ # upgrade schema
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 16)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ migration_api.upgrade(engine, TestMigrations.REPOSITORY, 17)
+
+ # encryption key UUID
+ volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
+ self.assertTrue('encryption_key_id' in volumes.c)
+ self.assertTrue(isinstance(volumes.c.encryption_key_id.type,
+ sqlalchemy.types.VARCHAR))
+
+ snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
+ self.assertTrue('encryption_key_id' in snapshots.c)
+ self.assertTrue(isinstance(snapshots.c.encryption_key_id.type,
+ sqlalchemy.types.VARCHAR))
+ self.assertTrue('volume_type_id' in snapshots.c)
+ self.assertTrue(isinstance(snapshots.c.volume_type_id.type,
+ sqlalchemy.types.VARCHAR))
+
+ # encryption types table
+ encryption = sqlalchemy.Table('encryption',
+ metadata,
+ autoload=True)
+ self.assertTrue(isinstance(encryption.c.volume_type_id.type,
+ sqlalchemy.types.VARCHAR))
+ self.assertTrue(isinstance(encryption.c.cipher.type,
+ sqlalchemy.types.VARCHAR))
+ self.assertTrue(isinstance(encryption.c.key_size.type,
+ sqlalchemy.types.INTEGER))
+ self.assertTrue(isinstance(encryption.c.provider.type,
+ sqlalchemy.types.VARCHAR))
+
+ # downgrade schema
+ migration_api.downgrade(engine, TestMigrations.REPOSITORY, 16)
+ metadata = sqlalchemy.schema.MetaData()
+ metadata.bind = engine
+
+ volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
+ self.assertTrue('encryption_key_id' not in volumes.c)
+
+ snapshots = sqlalchemy.Table('snapshots', metadata, autoload=True)
+ self.assertTrue('encryption_key_id' not in snapshots.c)
+
+ self.assertFalse(engine.dialect.has_table(engine.connect(),
+ 'encryption'))
class VolumeTypeQuotaEngineTestCase(test.TestCase):
def test_default_resources(self):
+ def fake_vtga(context, inactive=False, filters=None):
+ return {}
+ self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
+
engine = quota.VolumeTypeQuotaEngine()
self.assertEqual(engine.resource_names,
['gigabytes', 'snapshots', 'volumes'])
ctx = context.RequestContext('admin', 'admin', is_admin=True)
vtype = db.volume_type_create(ctx, {'name': 'type1'})
vtype2 = db.volume_type_create(ctx, {'name': 'type_2'})
+
+ def fake_vtga(context, inactive=False, filters=None):
+ return {
+ 'type1': {
+ 'id': vtype['id'],
+ 'name': 'type1',
+ 'extra_specs': {},
+ },
+ 'type_2': {
+ 'id': vtype['id'],
+ 'name': 'type_2',
+ 'extra_specs': {},
+ },
+ }
+ self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
+
engine = quota.VolumeTypeQuotaEngine()
self.assertEqual(engine.resource_names,
['gigabytes', 'gigabytes_type1', 'gigabytes_type_2',
def test_get_defaults(self):
# Use our pre-defined resources
self._stub_quota_class_get_default()
+ self._stub_volume_type_get_all()
result = self.driver.get_defaults(None, quota.QUOTAS.resources)
self.assertEqual(
gigabytes=1000,)
self.stubs.Set(db, 'quota_class_get_default', fake_qcgd)
+ def _stub_volume_type_get_all(self):
+ def fake_vtga(context, inactive=False, filters=None):
+ return {}
+ self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
+
def _stub_quota_class_get_all_by_name(self):
# Stub out quota_class_get_all_by_name
def fake_qcgabn(context, quota_class):
def test_get_class_quotas(self):
self._stub_quota_class_get_all_by_name()
+ self._stub_volume_type_get_all()
result = self.driver.get_class_quotas(None, quota.QUOTAS.resources,
'test_class')
def test_get_project_quotas(self):
self._stub_get_by_project()
+ self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project')
def test_get_project_quotas_alt_context_no_class(self):
self._stub_get_by_project()
+ self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('other_project', 'other_class'),
quota.QUOTAS.resources, 'test_project')
def test_get_project_quotas_alt_context_with_class(self):
self._stub_get_by_project()
+ self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('other_project', 'other_class'),
quota.QUOTAS.resources, 'test_project', quota_class='test_class')
def test_get_project_quotas_no_defaults(self):
self._stub_get_by_project()
+ self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project', defaults=False)
def test_get_project_quotas_no_usages(self):
self._stub_get_by_project()
+ self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project', usages=False)
from cinder import db
from cinder import exception
from cinder.image import image_utils
+from cinder import keymgr
from cinder.openstack.common import importutils
from cinder.openstack.common.notifier import api as notifier_api
from cinder.openstack.common.notifier import test_notifier
from cinder import test
from cinder.tests import conf_fixture
from cinder.tests.image import fake as fake_image
+from cinder.tests.keymgr import fake as fake_keymgr
+import cinder.volume
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume.drivers import lvm
volume = self._create_volume()
volume_id = volume['id']
+ self.assertIsNone(volume['encryption_key_id'])
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.volume.create_volume(self.context, volume_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
'name',
'description')
self.assertEquals(volume['volume_type_id'], None)
+ self.assertEquals(volume['encryption_key_id'], None)
# Create default volume type
vol_type = conf_fixture.def_vol_type
'name',
'description')
self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertIsNone(volume['encryption_key_id'])
# Create volume with specific volume type
vol_type = 'test'
volume_type=db_vol_type)
self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+ def test_create_volume_with_encrypted_volume_type(self):
+ self.stubs.Set(keymgr, "API", fake_keymgr.fake_api)
+
+ volume_api = cinder.volume.api.API()
+
+ db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
+ 'LUKS')
+
+ volume = volume_api.create(self.context,
+ 1,
+ 'name',
+ 'description',
+ volume_type=db_vol_type)
+ self.assertEquals(volume['volume_type_id'], db_vol_type.get('id'))
+ self.assertIsNotNone(volume['encryption_key_id'])
+
def test_delete_busy_volume(self):
"""Test volume survives deletion if driver reports it as busy."""
volume = self._create_volume()
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_src['id'])
+ def test_create_volume_from_snapshot_with_encryption(self):
+ """Test volume can be created from a snapshot of
+ an encrypted volume.
+ """
+ self.stubs.Set(keymgr, 'API', fake_keymgr.fake_api)
+
+ volume_api = cinder.volume.api.API()
+
+ db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
+ 'LUKS')
+ volume_src = volume_api.create(self.context,
+ 1,
+ 'name',
+ 'description',
+ volume_type=db_vol_type)
+ snapshot_ref = volume_api.create_snapshot_force(self.context,
+ volume_src,
+ 'name',
+ 'description')
+ snapshot_ref['status'] = 'available' # status must be available
+ volume_dst = volume_api.create(self.context,
+ 1,
+ 'name',
+ 'description',
+ snapshot=snapshot_ref)
+ self.assertEqual(volume_dst['id'],
+ db.volume_get(
+ context.get_admin_context(),
+ volume_dst['id']).id)
+ self.assertEqual(snapshot_ref['id'],
+ db.volume_get(context.get_admin_context(),
+ volume_dst['id']).snapshot_id)
+
+ # ensure encryption keys match
+ self.assertIsNotNone(volume_src['encryption_key_id'])
+ self.assertIsNotNone(volume_dst['encryption_key_id'])
+
+ key_manager = volume_api.key_manager # must use *same* key manager
+ volume_src_key = key_manager.get_key(self.context,
+ volume_src['encryption_key_id'])
+ volume_dst_key = key_manager.get_key(self.context,
+ volume_dst['encryption_key_id'])
+ self.assertEqual(volume_src_key, volume_dst_key)
+
+ def test_create_volume_from_encrypted_volume(self):
+ """Test volume can be created from an encrypted volume."""
+ self.stubs.Set(keymgr, 'API', fake_keymgr.fake_api)
+
+ volume_api = cinder.volume.api.API()
+
+ db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
+ 'LUKS')
+ volume_src = volume_api.create(self.context,
+ 1,
+ 'name',
+ 'description',
+ volume_type=db_vol_type)
+ volume_src['status'] = 'available' # status must be available
+ volume_dst = volume_api.create(self.context,
+ 1,
+ 'name',
+ 'description',
+ source_volume=volume_src)
+ self.assertEqual(volume_dst['id'],
+ db.volume_get(context.get_admin_context(),
+ volume_dst['id']).id)
+ self.assertEqual(volume_src['id'],
+ db.volume_get(context.get_admin_context(),
+ volume_dst['id']).source_volid)
+
+ # ensure encryption keys match
+ self.assertIsNotNone(volume_src['encryption_key_id'])
+ self.assertIsNotNone(volume_dst['encryption_key_id'])
+
+ key_manager = volume_api.key_manager # must use *same* key manager
+ volume_src_key = key_manager.get_key(self.context,
+ volume_src['encryption_key_id'])
+ volume_dst_key = key_manager.get_key(self.context,
+ volume_dst['encryption_key_id'])
+ self.assertEqual(volume_src_key, volume_dst_key)
+
def test_create_volume_from_snapshot_fail_bad_size(self):
"""Test volume can't be created from snapshot with bad volume size."""
volume_api = cinder.volume.api.API()
{"key1": "val1", "key2": "val2", "key3": "val3"})
self.assertEqual(vol_types['type3']['extra_specs'],
{"key1": "val1", "key3": "val3", "key4": "val4"})
+
+ def test_is_encrypted(self):
+ volume_type = volume_types.create(self.ctxt, "type1")
+ volume_type_id = volume_type.get('id')
+ self.assertFalse(volume_types.is_encrypted(self.ctxt, volume_type_id))
+
+ encryption = {
+ 'control_location': 'front-end',
+ 'provider': 'fake_provider',
+ }
+ db_api.volume_type_encryption_update_or_create(self.ctxt,
+ volume_type_id,
+ encryption)
+ self.assertTrue(volume_types.is_encrypted(self.ctxt, volume_type_id))
from cinder.db import base
from cinder import exception
from cinder.image import glance
+from cinder import keymgr
from cinder.openstack.common import excutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import timeutils
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
self.availability_zone_names = ()
+ self.key_manager = keymgr.API()
super(API, self).__init__(db_driver)
def _valid_availabilty_zone(self, availability_zone):
def create(self, context, size, name, description, snapshot=None,
image_id=None, volume_type=None, metadata=None,
availability_zone=None, source_volume=None,
- scheduler_hints=None):
+ scheduler_hints=None, backup_source_volume=None):
def check_volume_az_zone(availability_zone):
try:
'availability_zone': availability_zone,
'source_volume': source_volume,
'scheduler_hints': scheduler_hints,
+ 'key_manager': self.key_manager,
+ 'backup_source_volume': backup_source_volume,
}
(flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
self.volume_rpcapi,
msg = _("Volume still has %d dependent snapshots") % len(snapshots)
raise exception.InvalidVolume(reason=msg)
+ # If the volume is encrypted, delete its encryption key from the key
+ # manager. This operation makes volume deletion an irreversible process
+ # because the volume cannot be decrypted without its key.
+ encryption_key_id = volume.get('encryption_key_id', None)
+ if encryption_key_id is not None:
+ self.key_manager.delete_key(encryption_key_id)
+
now = timeutils.utcnow()
self.db.volume_update(context, volume_id, {'status': 'deleting',
'terminated_at': now})
'volume_size': volume['size'],
'display_name': name,
'display_description': description,
+ 'volume_type_id': volume['volume_type_id'],
+ 'encryption_key_id': volume['encryption_key_id'],
'metadata': metadata}
try:
properties['physical_block_size'] = physical_block_size
properties['logical_block_size'] = logical_block_size
+ encryption_key_id = volume.get('encryption_key_id', None)
+ properties['encrypted'] = encryption_key_id is not None
+
return properties
def _run_iscsiadm(self, iscsi_properties, iscsi_command, **kwargs):
# saved to durable storage in the future so that the flow can be
# reconstructed elsewhere and continued).
self.provides.update(['availability_zone', 'size', 'snapshot_id',
- 'source_volid', 'volume_type', 'volume_type_id'])
+ 'source_volid', 'volume_type', 'volume_type_id',
+ 'encryption_key_id'])
# This task requires the following inputs to operate (provided
# automatically to __call__(). This is done so that the flow can
# be reconstructed elsewhere and continue running (in the future).
# mostly automatic).
self.requires.update(['availability_zone', 'image_id', 'metadata',
'size', 'snapshot', 'source_volume',
- 'volume_type'])
+ 'volume_type', 'key_manager',
+ 'backup_source_volume'])
self.image_service = image_service
self.az_check_functor = az_check_functor
if not self.az_check_functor:
return availability_zone
+ def _get_encryption_key_id(self, key_manager, context, volume_type_id,
+ snapshot, source_volume, backup_source_volume):
+ encryption_key_id = None
+ if volume_types.is_encrypted(context, volume_type_id):
+ if snapshot is not None: # creating from snapshot
+ encryption_key_id = snapshot['encryption_key_id']
+ elif source_volume is not None: # cloning volume
+ encryption_key_id = source_volume['encryption_key_id']
+ elif backup_source_volume is not None: # creating from backup
+ encryption_key_id = backup_source_volume['encryption_key_id']
+
+ # NOTE(joel-coffman): References to the encryption key should *not*
+ # be copied because the key is deleted when the volume is deleted.
+ # Clone the existing key and associate a separate -- but
+ # identical -- key with each volume.
+ if encryption_key_id is not None:
+ encryption_key_id = key_manager.copy_key(context,
+ encryption_key_id)
+ else:
+ encryption_key_id = key_manager.create_key(context)
+
+ return encryption_key_id
+
+ def _get_volume_type_id(self, volume_type, source_volume, snapshot,
+ backup_source_volume):
+ volume_type_id = None
+ if not volume_type and source_volume:
+ volume_type_id = source_volume['volume_type_id']
+ elif snapshot is not None:
+ if volume_type:
+ current_volume_type_id = volume_type.get('id')
+ if (current_volume_type_id !=
+ snapshot['volume_type_id']):
+ msg = _("Volume type will be changed to "
+ "be the same as the source volume.")
+ LOG.warn(msg)
+ volume_type_id = snapshot['volume_type_id']
+ elif backup_source_volume is not None:
+ volume_type_id = backup_source_volume['volume_type_id']
+ else:
+ volume_type_id = volume_type.get('id')
+
+ return volume_type_id
+
def __call__(self, context, size, snapshot, image_id, source_volume,
- availability_zone, volume_type, metadata):
+ availability_zone, volume_type, metadata,
+ key_manager, backup_source_volume):
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
snapshot,
source_volume)
- if not volume_type and not source_volume:
+ # TODO(joel-coffman): This special handling of snapshots to ensure that
+ # their volume type matches the source volume is too convoluted. We
+ # should copy encryption metadata from the encrypted volume type to the
+ # volume upon creation and propogate that information to each snapshot.
+ # This strategy avoid any dependency upon the encrypted volume type.
+ if not volume_type and not source_volume and not snapshot:
volume_type = volume_types.get_default_volume_type()
- if not volume_type and source_volume:
- volume_type_id = source_volume['volume_type_id']
- else:
- volume_type_id = volume_type.get('id')
+
+ volume_type_id = self._get_volume_type_id(volume_type,
+ source_volume, snapshot,
+ backup_source_volume)
+
+ encryption_key_id = self._get_encryption_key_id(key_manager,
+ context,
+ volume_type_id,
+ snapshot,
+ source_volume,
+ backup_source_volume)
self._check_metadata_properties(metadata)
'availability_zone': availability_zone,
'volume_type': volume_type,
'volume_type_id': volume_type_id,
+ 'encryption_key_id': encryption_key_id,
}
self.db = db
self.requires.update(['availability_zone', 'description', 'metadata',
'name', 'reservations', 'size', 'snapshot_id',
- 'source_volid', 'volume_type_id'])
+ 'source_volid', 'volume_type_id',
+ 'encryption_key_id'])
self.provides.update(['volume_properties', 'volume_id'])
def __call__(self, context, **kwargs):
'project_id': context.project_id,
'status': 'creating',
'attach_status': 'detached',
+ 'encryption_key_id': kwargs.pop('encryption_key_id'),
# Rename these to the internal name.
'display_description': kwargs.pop('description'),
'display_name': kwargs.pop('name'),
return False
else:
return extra_specs
+
+
+def is_encrypted(context, volume_type_id):
+ if volume_type_id is None:
+ return False
+
+ encryption = db.volume_type_encryption_get(context, volume_type_id)
+ return encryption is not None
"volume_extension:types_manage": [["rule:admin_api"]],
"volume_extension:types_extra_specs": [["rule:admin_api"]],
+ "volume_extension:volume_type_encryption": [["rule:admin_api"]],
+ "volume_extension:volume_encryption_metadata": [["rule:admin_api"]],
"volume_extension:extended_snapshot_attributes": [],
"volume_extension:volume_image_metadata": [],