]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add entry create and cast tasks to manage workflow
authorAnton Arefiev <aarefiev@mirantis.com>
Mon, 9 Feb 2015 13:30:45 +0000 (15:30 +0200)
committerAnton Arefiev <aarefiev@mirantis.com>
Mon, 27 Jul 2015 16:20:22 +0000 (19:20 +0300)
This change adds manage existing api task in flow. The task
is used in the volume api to provide full value task flow for
manage existing process. All errors occurred during manage
flow set volume to 'error' state.

Entry creating moved from volume api to EntryCreateTask. Also
added ManageCastTask to provide manage process to scheduler.

Related-Bug: #1364550
Change-Id: I12a4311953c1c86d584b5bf2fe2888e5b5127d43

cinder/exception.py
cinder/tests/unit/volume/flows/__init__.py [new file with mode: 0644]
cinder/tests/unit/volume/flows/fake_volume_api.py [new file with mode: 0644]
cinder/tests/unit/volume/flows/test_create_volume_flow.py [moved from cinder/tests/unit/test_create_volume_flow.py with 69% similarity]
cinder/tests/unit/volume/flows/test_manage_volume_flow.py [new file with mode: 0644]
cinder/volume/api.py
cinder/volume/flows/api/manage_existing.py [new file with mode: 0644]
tox.ini

index 56aa9a89d15de8ab800deffb0f3294e4a0bee73f..127504f75d447e3eaa0cefe5c103f62523327303 100644 (file)
@@ -647,6 +647,11 @@ class ManageExistingInvalidReference(CinderException):
                 "reference %(existing_ref)s: %(reason)s")
 
 
+class ManageExistingAlreadyManaged(CinderException):
+    message = _("Unable to manage existing volume. "
+                "Volume %(volume_ref)s already managed.")
+
+
 class ReplicationError(CinderException):
     message = _("Volume %(volume_id)s replication "
                 "error: %(reason)s")
diff --git a/cinder/tests/unit/volume/flows/__init__.py b/cinder/tests/unit/volume/flows/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/cinder/tests/unit/volume/flows/fake_volume_api.py b/cinder/tests/unit/volume/flows/fake_volume_api.py
new file mode 100644 (file)
index 0000000..d424758
--- /dev/null
@@ -0,0 +1,62 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+class FakeVolumeAPI(object):
+    def __init__(self, expected_spec, test_inst):
+        self.expected_spec = expected_spec
+        self.test_inst = test_inst
+
+    def create_volume(self, ctxt, volume, host,
+                      request_spec, filter_properties,
+                      allow_reschedule=True,
+                      snapshot_id=None, image_id=None,
+                      source_volid=None,
+                      source_replicaid=None):
+
+        self.test_inst.assertEqual(self.expected_spec, request_spec)
+        self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
+        self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
+        self.test_inst.assertEqual(request_spec['image_id'], image_id)
+        self.test_inst.assertEqual(request_spec['source_replicaid'],
+                                   source_replicaid)
+
+
+class FakeSchedulerRpcAPI(object):
+    def __init__(self, expected_spec, test_inst):
+        self.expected_spec = expected_spec
+        self.test_inst = test_inst
+
+    def create_volume(self, ctxt, volume, volume_ref, snapshot_id=None,
+                      image_id=None, request_spec=None,
+                      filter_properties=None):
+
+        self.test_inst.assertEqual(self.expected_spec, request_spec)
+
+    def manage_existing(self, context, volume_topic, volume_id,
+                        request_spec=None):
+        self.test_inst.assertEqual(self.expected_spec, request_spec)
+
+
+class FakeDb(object):
+
+    def volume_get(self, *args, **kwargs):
+        return {'host': 'barf'}
+
+    def volume_update(self, *args, **kwargs):
+        return {'host': 'farb'}
+
+    def snapshot_get(self, *args, **kwargs):
+        return {'volume_id': 1}
+
+    def consistencygroup_get(self, *args, **kwargs):
+        return {'consistencygroup_id': 1}
similarity index 69%
rename from cinder/tests/unit/test_create_volume_flow.py
rename to cinder/tests/unit/volume/flows/test_create_volume_flow.py
index 2847be3125247fcec11235a9e7410b26c368c211..5ec263d0bcece39a795510447b3d7de958c38ef0 100644 (file)
@@ -14,8 +14,6 @@
 #    under the License.
 """ Tests for create_volume TaskFlow """
 
-import time
-
 import mock
 
 from cinder import context
@@ -23,57 +21,11 @@ from cinder import exception
 from cinder import test
 from cinder.tests.unit import fake_snapshot
 from cinder.tests.unit import fake_volume
+from cinder.tests.unit.volume.flows import fake_volume_api
 from cinder.volume.flows.api import create_volume
 from cinder.volume.flows.manager import create_volume as create_volume_manager
 
 
-class fake_scheduler_rpc_api(object):
-    def __init__(self, expected_spec, test_inst):
-        self.expected_spec = expected_spec
-        self.test_inst = test_inst
-
-    def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
-                      image_id=None, request_spec=None,
-                      filter_properties=None):
-
-        self.test_inst.assertEqual(self.expected_spec, request_spec)
-
-
-class fake_volume_api(object):
-    def __init__(self, expected_spec, test_inst):
-        self.expected_spec = expected_spec
-        self.test_inst = test_inst
-
-    def create_volume(self, ctxt, volume, host,
-                      request_spec, filter_properties,
-                      allow_reschedule=True,
-                      snapshot_id=None, image_id=None,
-                      source_volid=None,
-                      source_replicaid=None):
-
-        self.test_inst.assertEqual(self.expected_spec, request_spec)
-        self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
-        self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
-        self.test_inst.assertEqual(request_spec['image_id'], image_id)
-        self.test_inst.assertEqual(request_spec['source_replicaid'],
-                                   source_replicaid)
-
-
-class fake_db(object):
-
-    def volume_get(self, *args, **kwargs):
-        return {'host': 'barf'}
-
-    def volume_update(self, *args, **kwargs):
-        return {'host': 'farb'}
-
-    def snapshot_get(self, *args, **kwargs):
-        return {'volume_id': 1}
-
-    def consistencygroup_get(self, *args, **kwargs):
-        return {'consistencygroup_id': 1}
-
-
 class CreateVolumeFlowTestCase(test.TestCase):
 
     def time_inc(self):
@@ -88,9 +40,9 @@ class CreateVolumeFlowTestCase(test.TestCase):
         # Ensure that time.time() always returns more than the last time it was
         # called to avoid div by zero errors.
         self.counter = float(0)
-        self.stubs.Set(time, 'time', self.time_inc)
 
-    def test_cast_create_volume(self):
+    @mock.patch('time.time', side_effect=time_inc)
+    def test_cast_create_volume(self, mock_time):
 
         props = {}
         spec = {'volume_id': None,
@@ -101,10 +53,11 @@ class CreateVolumeFlowTestCase(test.TestCase):
                 'consistencygroup_id': None,
                 'cgsnapshot_id': None}
 
+        # Fake objects assert specs
         task = create_volume.VolumeCastTask(
-            fake_scheduler_rpc_api(spec, self),
-            fake_volume_api(spec, self),
-            fake_db())
+            fake_volume_api.FakeSchedulerRpcAPI(spec, self),
+            fake_volume_api.FakeVolumeAPI(spec, self),
+            fake_volume_api.FakeDb())
 
         task._cast_create_volume(self.ctxt, spec, props)
 
@@ -116,10 +69,11 @@ class CreateVolumeFlowTestCase(test.TestCase):
                 'consistencygroup_id': 5,
                 'cgsnapshot_id': None}
 
+        # Fake objects assert specs
         task = create_volume.VolumeCastTask(
-            fake_scheduler_rpc_api(spec, self),
-            fake_volume_api(spec, self),
-            fake_db())
+            fake_volume_api.FakeSchedulerRpcAPI(spec, self),
+            fake_volume_api.FakeVolumeAPI(spec, self),
+            fake_volume_api.FakeDb())
 
         task._cast_create_volume(self.ctxt, spec, props)
 
diff --git a/cinder/tests/unit/volume/flows/test_manage_volume_flow.py b/cinder/tests/unit/volume/flows/test_manage_volume_flow.py
new file mode 100644 (file)
index 0000000..be32af9
--- /dev/null
@@ -0,0 +1,70 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+""" Tests for manage_existing TaskFlow """
+
+import mock
+
+from cinder import context
+from cinder import test
+from cinder.tests.unit.volume.flows import fake_volume_api
+from cinder.volume.flows.api import manage_existing
+
+
+class ManageVolumeFlowTestCase(test.TestCase):
+
+    def setUp(self):
+        super(ManageVolumeFlowTestCase, self).setUp()
+        self.ctxt = context.get_admin_context()
+        self.counter = float(0)
+
+    def test_cast_manage_existing(self):
+
+        volume = mock.MagicMock(return_value=None)
+        spec = {
+            'name': 'name',
+            'description': 'description',
+            'host': 'host',
+            'ref': 'ref',
+            'volume_type': 'volume_type',
+            'metadata': 'metadata',
+            'availability_zone': 'availability_zone',
+            'bootable': 'bootable'}
+
+        # Fake objects assert specs
+        task = manage_existing.ManageCastTask(
+            fake_volume_api.FakeSchedulerRpcAPI(spec, self),
+            fake_volume_api.FakeDb())
+
+        create_what = spec.copy()
+        create_what.update({'volume': volume})
+        task.execute(self.ctxt, **create_what)
+
+        volume = mock.MagicMock(return_value={'id': 1})
+
+        spec = {
+            'name': 'name',
+            'description': 'description',
+            'host': 'host',
+            'ref': 'ref',
+            'volume_type': 'volume_type',
+            'metadata': 'metadata',
+            'availability_zone': 'availability_zone',
+            'bootable': 'bootable'}
+
+        # Fake objects assert specs
+        task = manage_existing.ManageCastTask(
+            fake_volume_api.FakeSchedulerRpcAPI(spec, self),
+            fake_volume_api.FakeDb())
+
+        create_what = spec.copy()
+        create_what.update({'volume': volume})
+        task.execute(self.ctxt, **create_what)
index 11abff87e5c1010e72aef0549263e97b65c1b57b..ebb1511fc9ba6f924035a444a2b2b98c8df59f46 100644 (file)
@@ -45,6 +45,7 @@ from cinder import quota_utils
 from cinder.scheduler import rpcapi as scheduler_rpcapi
 from cinder import utils
 from cinder.volume.flows.api import create_volume
+from cinder.volume.flows.api import manage_existing
 from cinder.volume import qos_specs
 from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import utils as volume_utils
@@ -1451,36 +1452,36 @@ class API(base.Base):
                     LOG.error(_LE('Unable to find service for given host.'))
             availability_zone = service.get('availability_zone')
 
-        volume_type_id = volume_type['id'] if volume_type else None
-        volume_properties = {
-            'size': 0,
-            'user_id': context.user_id,
-            'project_id': context.project_id,
-            'status': 'creating',
-            'attach_status': 'detached',
-            # Rename these to the internal name.
-            'display_description': description,
-            'display_name': name,
+        manage_what = {
+            'context': context,
+            'name': name,
+            'description': description,
             'host': host,
-            'availability_zone': availability_zone,
-            'volume_type_id': volume_type_id,
+            'ref': ref,
+            'volume_type': volume_type,
             'metadata': metadata,
-            'bootable': bootable
+            'availability_zone': availability_zone,
+            'bootable': bootable,
         }
 
-        # Call the scheduler to ensure that the host exists and that it can
-        # accept the volume
-        volume = self.db.volume_create(context, volume_properties)
-        request_spec = {'volume_properties': volume,
-                        'volume_type': volume_type,
-                        'volume_id': volume['id'],
-                        'ref': ref}
-        self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
-                                              volume['id'],
-                                              request_spec=request_spec)
-        LOG.info(_LI("Manage volume request issued successfully."),
-                 resource=volume)
-        return volume
+        try:
+            flow_engine = manage_existing.get_flow(self.scheduler_rpcapi,
+                                                   self.db,
+                                                   manage_what)
+        except Exception:
+            msg = _('Failed to manage api volume flow.')
+            LOG.exception(msg)
+            raise exception.CinderException(msg)
+
+        # Attaching this listener will capture all of the notifications that
+        # taskflow sends out and redirect them to a more useful log for
+        # cinder's debugging (or error reporting) usage.
+        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
+            flow_engine.run()
+            vol_ref = flow_engine.storage.fetch('volume')
+            LOG.info(_LI("Manage volume request issued successfully."),
+                     resource=vol_ref)
+            return vol_ref
 
 
 class HostAPI(base.Base):
diff --git a/cinder/volume/flows/api/manage_existing.py b/cinder/volume/flows/api/manage_existing.py
new file mode 100644 (file)
index 0000000..6b9017b
--- /dev/null
@@ -0,0 +1,153 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+from oslo_config import cfg
+from oslo_log import log as logging
+import taskflow.engines
+from taskflow.patterns import linear_flow
+from taskflow.types import failure as ft
+
+from cinder import exception
+from cinder import flow_utils
+from cinder.i18n import _LE
+from cinder.volume.flows import common
+
+LOG = logging.getLogger(__name__)
+
+ACTION = 'volume:manage_existing'
+CONF = cfg.CONF
+
+
+class EntryCreateTask(flow_utils.CinderTask):
+    """Creates an entry for the given volume creation in the database.
+
+    Reversion strategy: remove the volume_id created from the database.
+    """
+    default_provides = set(['volume_properties', 'volume'])
+
+    def __init__(self, db):
+        requires = ['availability_zone', 'description', 'metadata',
+                    'name', 'host', 'bootable', 'volume_type', 'ref']
+        super(EntryCreateTask, self).__init__(addons=[ACTION],
+                                              requires=requires)
+        self.db = db
+
+    def execute(self, context, **kwargs):
+        """Creates a database entry for the given inputs and returns details.
+
+        Accesses the database and creates a new entry for the to be created
+        volume using the given volume properties which are extracted from the
+        input kwargs.
+        """
+        volume_type = kwargs.pop('volume_type')
+        volume_type_id = volume_type['id'] if volume_type else None
+
+        volume_properties = {
+            'size': 0,
+            'user_id': context.user_id,
+            'project_id': context.project_id,
+            'status': 'creating',
+            'attach_status': 'detached',
+            # Rename these to the internal name.
+            'display_description': kwargs.pop('description'),
+            'display_name': kwargs.pop('name'),
+            'host': kwargs.pop('host'),
+            'availability_zone': kwargs.pop('availability_zone'),
+            'volume_type_id': volume_type_id,
+            'metadata': kwargs.pop('metadata'),
+            'bootable': kwargs.pop('bootable'),
+        }
+
+        volume = self.db.volume_create(context, volume_properties)
+
+        return {
+            'volume_properties': volume_properties,
+            # NOTE(harlowja): it appears like further usage of this volume
+            # result actually depend on it being a sqlalchemy object and not
+            # just a plain dictionary so that's why we are storing this here.
+            #
+            # In the future where this task results can be serialized and
+            # restored automatically for continued running we will need to
+            # resolve the serialization & recreation of this object since raw
+            # sqlalchemy objects can't be serialized.
+            'volume': volume,
+        }
+
+    def revert(self, context, result, optional_args, **kwargs):
+        # We never produced a result and therefore can't destroy anything.
+        if isinstance(result, ft.Failure):
+            return
+
+        vol_id = result['volume_id']
+        try:
+            self.db.volume_destroy(context.elevated(), vol_id)
+        except exception.CinderException:
+            LOG.exception(_LE("Failed destroying volume entry: %s."), vol_id)
+
+
+class ManageCastTask(flow_utils.CinderTask):
+    """Performs a volume manage cast to the scheduler and to the volume manager.
+
+    This which will signal a transition of the api workflow to another child
+    and/or related workflow.
+    """
+
+    def __init__(self, scheduler_rpcapi, db):
+        requires = ['volume', 'volume_properties', 'volume_type', 'ref']
+        super(ManageCastTask, self).__init__(addons=[ACTION],
+                                             requires=requires)
+        self.scheduler_rpcapi = scheduler_rpcapi
+        self.db = db
+
+    def execute(self, context, **kwargs):
+        volume = kwargs.pop('volume')
+        request_spec = kwargs.copy()
+
+        # Call the scheduler to ensure that the host exists and that it can
+        # accept the volume
+        self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
+                                              volume['id'],
+                                              request_spec=request_spec)
+
+    def revert(self, context, result, flow_failures, **kwargs):
+        # Restore the source volume status and set the volume to error status.
+        volume_id = kwargs['volume_id']
+        common.error_out_volume(context, self.db, volume_id)
+        LOG.error(_LE("Volume %s: manage failed."), volume_id)
+        exc_info = False
+        if all(flow_failures[-1].exc_info):
+            exc_info = flow_failures[-1].exc_info
+        LOG.error(_LE('Unexpected build error:'), exc_info=exc_info)
+
+
+def get_flow(scheduler_rpcapi, db_api, create_what):
+    """Constructs and returns the api entrypoint flow.
+
+    This flow will do the following:
+
+    1. Inject keys & values for dependent tasks.
+    2. Extracts and validates the input keys & values.
+    3. Creates the database entry.
+    4. Casts to volume manager and scheduler for further processing.
+    """
+
+    flow_name = ACTION.replace(":", "_") + "_api"
+    api_flow = linear_flow.Flow(flow_name)
+
+    # This will cast it out to either the scheduler or volume manager via
+    # the rpc apis provided.
+    api_flow.add(EntryCreateTask(db_api),
+                 ManageCastTask(scheduler_rpcapi, db_api))
+
+    # Now load (but do not run) the flow using the provided initial data.
+    return taskflow.engines.load(api_flow, store=create_what)
diff --git a/tox.ini b/tox.ini
index ce92a0bc536b99f4ddcd0718b3b67e4a7aa724a4..3a965365759500b2499c1a16fe3b79768791a76a 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -49,7 +49,6 @@ commands =
     cinder.tests.unit.test_cmd \
     cinder.tests.unit.test_conf \
     cinder.tests.unit.test_context \
-    cinder.tests.unit.test_create_volume_flow \
     cinder.tests.unit.test_db_api \
     cinder.tests.unit.test_dellfc \
     cinder.tests.unit.test_dellsc \
@@ -93,7 +92,8 @@ commands =
     cinder.tests.unit.test_volume_rpcapi \
     cinder.tests.unit.test_volume_types \
     cinder.tests.unit.test_volume_types_extra_specs \
-    cinder.tests.unit.test_volume_utils
+    cinder.tests.unit.test_volume_utils \
+    cinder.tests.unit.volume.flows.test_create_volume_flow
 
 [testenv:pep8]
 commands =