"reference %(existing_ref)s: %(reason)s")
+class ManageExistingAlreadyManaged(CinderException):
+ message = _("Unable to manage existing volume. "
+ "Volume %(volume_ref)s already managed.")
+
+
class ReplicationError(CinderException):
message = _("Volume %(volume_id)s replication "
"error: %(reason)s")
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+class FakeVolumeAPI(object):
+ def __init__(self, expected_spec, test_inst):
+ self.expected_spec = expected_spec
+ self.test_inst = test_inst
+
+ def create_volume(self, ctxt, volume, host,
+ request_spec, filter_properties,
+ allow_reschedule=True,
+ snapshot_id=None, image_id=None,
+ source_volid=None,
+ source_replicaid=None):
+
+ self.test_inst.assertEqual(self.expected_spec, request_spec)
+ self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
+ self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
+ self.test_inst.assertEqual(request_spec['image_id'], image_id)
+ self.test_inst.assertEqual(request_spec['source_replicaid'],
+ source_replicaid)
+
+
+class FakeSchedulerRpcAPI(object):
+ def __init__(self, expected_spec, test_inst):
+ self.expected_spec = expected_spec
+ self.test_inst = test_inst
+
+ def create_volume(self, ctxt, volume, volume_ref, snapshot_id=None,
+ image_id=None, request_spec=None,
+ filter_properties=None):
+
+ self.test_inst.assertEqual(self.expected_spec, request_spec)
+
+ def manage_existing(self, context, volume_topic, volume_id,
+ request_spec=None):
+ self.test_inst.assertEqual(self.expected_spec, request_spec)
+
+
+class FakeDb(object):
+
+ def volume_get(self, *args, **kwargs):
+ return {'host': 'barf'}
+
+ def volume_update(self, *args, **kwargs):
+ return {'host': 'farb'}
+
+ def snapshot_get(self, *args, **kwargs):
+ return {'volume_id': 1}
+
+ def consistencygroup_get(self, *args, **kwargs):
+ return {'consistencygroup_id': 1}
# under the License.
""" Tests for create_volume TaskFlow """
-import time
-
import mock
from cinder import context
from cinder import test
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
+from cinder.tests.unit.volume.flows import fake_volume_api
from cinder.volume.flows.api import create_volume
from cinder.volume.flows.manager import create_volume as create_volume_manager
-class fake_scheduler_rpc_api(object):
- def __init__(self, expected_spec, test_inst):
- self.expected_spec = expected_spec
- self.test_inst = test_inst
-
- def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
- image_id=None, request_spec=None,
- filter_properties=None):
-
- self.test_inst.assertEqual(self.expected_spec, request_spec)
-
-
-class fake_volume_api(object):
- def __init__(self, expected_spec, test_inst):
- self.expected_spec = expected_spec
- self.test_inst = test_inst
-
- def create_volume(self, ctxt, volume, host,
- request_spec, filter_properties,
- allow_reschedule=True,
- snapshot_id=None, image_id=None,
- source_volid=None,
- source_replicaid=None):
-
- self.test_inst.assertEqual(self.expected_spec, request_spec)
- self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
- self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
- self.test_inst.assertEqual(request_spec['image_id'], image_id)
- self.test_inst.assertEqual(request_spec['source_replicaid'],
- source_replicaid)
-
-
-class fake_db(object):
-
- def volume_get(self, *args, **kwargs):
- return {'host': 'barf'}
-
- def volume_update(self, *args, **kwargs):
- return {'host': 'farb'}
-
- def snapshot_get(self, *args, **kwargs):
- return {'volume_id': 1}
-
- def consistencygroup_get(self, *args, **kwargs):
- return {'consistencygroup_id': 1}
-
-
class CreateVolumeFlowTestCase(test.TestCase):
def time_inc(self):
# Ensure that time.time() always returns more than the last time it was
# called to avoid div by zero errors.
self.counter = float(0)
- self.stubs.Set(time, 'time', self.time_inc)
- def test_cast_create_volume(self):
+ @mock.patch('time.time', side_effect=time_inc)
+ def test_cast_create_volume(self, mock_time):
props = {}
spec = {'volume_id': None,
'consistencygroup_id': None,
'cgsnapshot_id': None}
+ # Fake objects assert specs
task = create_volume.VolumeCastTask(
- fake_scheduler_rpc_api(spec, self),
- fake_volume_api(spec, self),
- fake_db())
+ fake_volume_api.FakeSchedulerRpcAPI(spec, self),
+ fake_volume_api.FakeVolumeAPI(spec, self),
+ fake_volume_api.FakeDb())
task._cast_create_volume(self.ctxt, spec, props)
'consistencygroup_id': 5,
'cgsnapshot_id': None}
+ # Fake objects assert specs
task = create_volume.VolumeCastTask(
- fake_scheduler_rpc_api(spec, self),
- fake_volume_api(spec, self),
- fake_db())
+ fake_volume_api.FakeSchedulerRpcAPI(spec, self),
+ fake_volume_api.FakeVolumeAPI(spec, self),
+ fake_volume_api.FakeDb())
task._cast_create_volume(self.ctxt, spec, props)
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+""" Tests for manage_existing TaskFlow """
+
+import mock
+
+from cinder import context
+from cinder import test
+from cinder.tests.unit.volume.flows import fake_volume_api
+from cinder.volume.flows.api import manage_existing
+
+
+class ManageVolumeFlowTestCase(test.TestCase):
+
+ def setUp(self):
+ super(ManageVolumeFlowTestCase, self).setUp()
+ self.ctxt = context.get_admin_context()
+ self.counter = float(0)
+
+ def test_cast_manage_existing(self):
+
+ volume = mock.MagicMock(return_value=None)
+ spec = {
+ 'name': 'name',
+ 'description': 'description',
+ 'host': 'host',
+ 'ref': 'ref',
+ 'volume_type': 'volume_type',
+ 'metadata': 'metadata',
+ 'availability_zone': 'availability_zone',
+ 'bootable': 'bootable'}
+
+ # Fake objects assert specs
+ task = manage_existing.ManageCastTask(
+ fake_volume_api.FakeSchedulerRpcAPI(spec, self),
+ fake_volume_api.FakeDb())
+
+ create_what = spec.copy()
+ create_what.update({'volume': volume})
+ task.execute(self.ctxt, **create_what)
+
+ volume = mock.MagicMock(return_value={'id': 1})
+
+ spec = {
+ 'name': 'name',
+ 'description': 'description',
+ 'host': 'host',
+ 'ref': 'ref',
+ 'volume_type': 'volume_type',
+ 'metadata': 'metadata',
+ 'availability_zone': 'availability_zone',
+ 'bootable': 'bootable'}
+
+ # Fake objects assert specs
+ task = manage_existing.ManageCastTask(
+ fake_volume_api.FakeSchedulerRpcAPI(spec, self),
+ fake_volume_api.FakeDb())
+
+ create_what = spec.copy()
+ create_what.update({'volume': volume})
+ task.execute(self.ctxt, **create_what)
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import utils
from cinder.volume.flows.api import create_volume
+from cinder.volume.flows.api import manage_existing
from cinder.volume import qos_specs
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
LOG.error(_LE('Unable to find service for given host.'))
availability_zone = service.get('availability_zone')
- volume_type_id = volume_type['id'] if volume_type else None
- volume_properties = {
- 'size': 0,
- 'user_id': context.user_id,
- 'project_id': context.project_id,
- 'status': 'creating',
- 'attach_status': 'detached',
- # Rename these to the internal name.
- 'display_description': description,
- 'display_name': name,
+ manage_what = {
+ 'context': context,
+ 'name': name,
+ 'description': description,
'host': host,
- 'availability_zone': availability_zone,
- 'volume_type_id': volume_type_id,
+ 'ref': ref,
+ 'volume_type': volume_type,
'metadata': metadata,
- 'bootable': bootable
+ 'availability_zone': availability_zone,
+ 'bootable': bootable,
}
- # Call the scheduler to ensure that the host exists and that it can
- # accept the volume
- volume = self.db.volume_create(context, volume_properties)
- request_spec = {'volume_properties': volume,
- 'volume_type': volume_type,
- 'volume_id': volume['id'],
- 'ref': ref}
- self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
- volume['id'],
- request_spec=request_spec)
- LOG.info(_LI("Manage volume request issued successfully."),
- resource=volume)
- return volume
+ try:
+ flow_engine = manage_existing.get_flow(self.scheduler_rpcapi,
+ self.db,
+ manage_what)
+ except Exception:
+ msg = _('Failed to manage api volume flow.')
+ LOG.exception(msg)
+ raise exception.CinderException(msg)
+
+ # Attaching this listener will capture all of the notifications that
+ # taskflow sends out and redirect them to a more useful log for
+ # cinder's debugging (or error reporting) usage.
+ with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
+ flow_engine.run()
+ vol_ref = flow_engine.storage.fetch('volume')
+ LOG.info(_LI("Manage volume request issued successfully."),
+ resource=vol_ref)
+ return vol_ref
class HostAPI(base.Base):
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from oslo_config import cfg
+from oslo_log import log as logging
+import taskflow.engines
+from taskflow.patterns import linear_flow
+from taskflow.types import failure as ft
+
+from cinder import exception
+from cinder import flow_utils
+from cinder.i18n import _LE
+from cinder.volume.flows import common
+
+LOG = logging.getLogger(__name__)
+
+ACTION = 'volume:manage_existing'
+CONF = cfg.CONF
+
+
+class EntryCreateTask(flow_utils.CinderTask):
+ """Creates an entry for the given volume creation in the database.
+
+ Reversion strategy: remove the volume_id created from the database.
+ """
+ default_provides = set(['volume_properties', 'volume'])
+
+ def __init__(self, db):
+ requires = ['availability_zone', 'description', 'metadata',
+ 'name', 'host', 'bootable', 'volume_type', 'ref']
+ super(EntryCreateTask, self).__init__(addons=[ACTION],
+ requires=requires)
+ self.db = db
+
+ def execute(self, context, **kwargs):
+ """Creates a database entry for the given inputs and returns details.
+
+ Accesses the database and creates a new entry for the to be created
+ volume using the given volume properties which are extracted from the
+ input kwargs.
+ """
+ volume_type = kwargs.pop('volume_type')
+ volume_type_id = volume_type['id'] if volume_type else None
+
+ volume_properties = {
+ 'size': 0,
+ 'user_id': context.user_id,
+ 'project_id': context.project_id,
+ 'status': 'creating',
+ 'attach_status': 'detached',
+ # Rename these to the internal name.
+ 'display_description': kwargs.pop('description'),
+ 'display_name': kwargs.pop('name'),
+ 'host': kwargs.pop('host'),
+ 'availability_zone': kwargs.pop('availability_zone'),
+ 'volume_type_id': volume_type_id,
+ 'metadata': kwargs.pop('metadata'),
+ 'bootable': kwargs.pop('bootable'),
+ }
+
+ volume = self.db.volume_create(context, volume_properties)
+
+ return {
+ 'volume_properties': volume_properties,
+ # NOTE(harlowja): it appears like further usage of this volume
+ # result actually depend on it being a sqlalchemy object and not
+ # just a plain dictionary so that's why we are storing this here.
+ #
+ # In the future where this task results can be serialized and
+ # restored automatically for continued running we will need to
+ # resolve the serialization & recreation of this object since raw
+ # sqlalchemy objects can't be serialized.
+ 'volume': volume,
+ }
+
+ def revert(self, context, result, optional_args, **kwargs):
+ # We never produced a result and therefore can't destroy anything.
+ if isinstance(result, ft.Failure):
+ return
+
+ vol_id = result['volume_id']
+ try:
+ self.db.volume_destroy(context.elevated(), vol_id)
+ except exception.CinderException:
+ LOG.exception(_LE("Failed destroying volume entry: %s."), vol_id)
+
+
+class ManageCastTask(flow_utils.CinderTask):
+ """Performs a volume manage cast to the scheduler and to the volume manager.
+
+ This which will signal a transition of the api workflow to another child
+ and/or related workflow.
+ """
+
+ def __init__(self, scheduler_rpcapi, db):
+ requires = ['volume', 'volume_properties', 'volume_type', 'ref']
+ super(ManageCastTask, self).__init__(addons=[ACTION],
+ requires=requires)
+ self.scheduler_rpcapi = scheduler_rpcapi
+ self.db = db
+
+ def execute(self, context, **kwargs):
+ volume = kwargs.pop('volume')
+ request_spec = kwargs.copy()
+
+ # Call the scheduler to ensure that the host exists and that it can
+ # accept the volume
+ self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
+ volume['id'],
+ request_spec=request_spec)
+
+ def revert(self, context, result, flow_failures, **kwargs):
+ # Restore the source volume status and set the volume to error status.
+ volume_id = kwargs['volume_id']
+ common.error_out_volume(context, self.db, volume_id)
+ LOG.error(_LE("Volume %s: manage failed."), volume_id)
+ exc_info = False
+ if all(flow_failures[-1].exc_info):
+ exc_info = flow_failures[-1].exc_info
+ LOG.error(_LE('Unexpected build error:'), exc_info=exc_info)
+
+
+def get_flow(scheduler_rpcapi, db_api, create_what):
+ """Constructs and returns the api entrypoint flow.
+
+ This flow will do the following:
+
+ 1. Inject keys & values for dependent tasks.
+ 2. Extracts and validates the input keys & values.
+ 3. Creates the database entry.
+ 4. Casts to volume manager and scheduler for further processing.
+ """
+
+ flow_name = ACTION.replace(":", "_") + "_api"
+ api_flow = linear_flow.Flow(flow_name)
+
+ # This will cast it out to either the scheduler or volume manager via
+ # the rpc apis provided.
+ api_flow.add(EntryCreateTask(db_api),
+ ManageCastTask(scheduler_rpcapi, db_api))
+
+ # Now load (but do not run) the flow using the provided initial data.
+ return taskflow.engines.load(api_flow, store=create_what)
cinder.tests.unit.test_cmd \
cinder.tests.unit.test_conf \
cinder.tests.unit.test_context \
- cinder.tests.unit.test_create_volume_flow \
cinder.tests.unit.test_db_api \
cinder.tests.unit.test_dellfc \
cinder.tests.unit.test_dellsc \
cinder.tests.unit.test_volume_rpcapi \
cinder.tests.unit.test_volume_types \
cinder.tests.unit.test_volume_types_extra_specs \
- cinder.tests.unit.test_volume_utils
+ cinder.tests.unit.test_volume_utils \
+ cinder.tests.unit.volume.flows.test_create_volume_flow
[testenv:pep8]
commands =