From: Anton Arefiev Date: Mon, 9 Feb 2015 13:30:45 +0000 (+0200) Subject: Add entry create and cast tasks to manage workflow X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=a1e4ad9ff2e9366a5f41b176b5a88d451079557c;p=openstack-build%2Fcinder-build.git Add entry create and cast tasks to manage workflow This change adds manage existing api task in flow. The task is used in the volume api to provide full value task flow for manage existing process. All errors occurred during manage flow set volume to 'error' state. Entry creating moved from volume api to EntryCreateTask. Also added ManageCastTask to provide manage process to scheduler. Related-Bug: #1364550 Change-Id: I12a4311953c1c86d584b5bf2fe2888e5b5127d43 --- diff --git a/cinder/exception.py b/cinder/exception.py index 56aa9a89d..127504f75 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -647,6 +647,11 @@ class ManageExistingInvalidReference(CinderException): "reference %(existing_ref)s: %(reason)s") +class ManageExistingAlreadyManaged(CinderException): + message = _("Unable to manage existing volume. " + "Volume %(volume_ref)s already managed.") + + class ReplicationError(CinderException): message = _("Volume %(volume_id)s replication " "error: %(reason)s") diff --git a/cinder/tests/unit/volume/flows/__init__.py b/cinder/tests/unit/volume/flows/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cinder/tests/unit/volume/flows/fake_volume_api.py b/cinder/tests/unit/volume/flows/fake_volume_api.py new file mode 100644 index 000000000..d424758c1 --- /dev/null +++ b/cinder/tests/unit/volume/flows/fake_volume_api.py @@ -0,0 +1,62 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class FakeVolumeAPI(object): + def __init__(self, expected_spec, test_inst): + self.expected_spec = expected_spec + self.test_inst = test_inst + + def create_volume(self, ctxt, volume, host, + request_spec, filter_properties, + allow_reschedule=True, + snapshot_id=None, image_id=None, + source_volid=None, + source_replicaid=None): + + self.test_inst.assertEqual(self.expected_spec, request_spec) + self.test_inst.assertEqual(request_spec['source_volid'], source_volid) + self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id) + self.test_inst.assertEqual(request_spec['image_id'], image_id) + self.test_inst.assertEqual(request_spec['source_replicaid'], + source_replicaid) + + +class FakeSchedulerRpcAPI(object): + def __init__(self, expected_spec, test_inst): + self.expected_spec = expected_spec + self.test_inst = test_inst + + def create_volume(self, ctxt, volume, volume_ref, snapshot_id=None, + image_id=None, request_spec=None, + filter_properties=None): + + self.test_inst.assertEqual(self.expected_spec, request_spec) + + def manage_existing(self, context, volume_topic, volume_id, + request_spec=None): + self.test_inst.assertEqual(self.expected_spec, request_spec) + + +class FakeDb(object): + + def volume_get(self, *args, **kwargs): + return {'host': 'barf'} + + def volume_update(self, *args, **kwargs): + return {'host': 'farb'} + + def snapshot_get(self, *args, **kwargs): + return {'volume_id': 1} + + def consistencygroup_get(self, *args, **kwargs): + return {'consistencygroup_id': 1} diff --git a/cinder/tests/unit/test_create_volume_flow.py b/cinder/tests/unit/volume/flows/test_create_volume_flow.py similarity index 69% rename from cinder/tests/unit/test_create_volume_flow.py rename to cinder/tests/unit/volume/flows/test_create_volume_flow.py index 2847be312..5ec263d0b 100644 --- a/cinder/tests/unit/test_create_volume_flow.py +++ b/cinder/tests/unit/volume/flows/test_create_volume_flow.py @@ -14,8 +14,6 @@ # under the License. """ Tests for create_volume TaskFlow """ -import time - import mock from cinder import context @@ -23,57 +21,11 @@ from cinder import exception from cinder import test from cinder.tests.unit import fake_snapshot from cinder.tests.unit import fake_volume +from cinder.tests.unit.volume.flows import fake_volume_api from cinder.volume.flows.api import create_volume from cinder.volume.flows.manager import create_volume as create_volume_manager -class fake_scheduler_rpc_api(object): - def __init__(self, expected_spec, test_inst): - self.expected_spec = expected_spec - self.test_inst = test_inst - - def create_volume(self, ctxt, topic, volume_id, snapshot_id=None, - image_id=None, request_spec=None, - filter_properties=None): - - self.test_inst.assertEqual(self.expected_spec, request_spec) - - -class fake_volume_api(object): - def __init__(self, expected_spec, test_inst): - self.expected_spec = expected_spec - self.test_inst = test_inst - - def create_volume(self, ctxt, volume, host, - request_spec, filter_properties, - allow_reschedule=True, - snapshot_id=None, image_id=None, - source_volid=None, - source_replicaid=None): - - self.test_inst.assertEqual(self.expected_spec, request_spec) - self.test_inst.assertEqual(request_spec['source_volid'], source_volid) - self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id) - self.test_inst.assertEqual(request_spec['image_id'], image_id) - self.test_inst.assertEqual(request_spec['source_replicaid'], - source_replicaid) - - -class fake_db(object): - - def volume_get(self, *args, **kwargs): - return {'host': 'barf'} - - def volume_update(self, *args, **kwargs): - return {'host': 'farb'} - - def snapshot_get(self, *args, **kwargs): - return {'volume_id': 1} - - def consistencygroup_get(self, *args, **kwargs): - return {'consistencygroup_id': 1} - - class CreateVolumeFlowTestCase(test.TestCase): def time_inc(self): @@ -88,9 +40,9 @@ class CreateVolumeFlowTestCase(test.TestCase): # Ensure that time.time() always returns more than the last time it was # called to avoid div by zero errors. self.counter = float(0) - self.stubs.Set(time, 'time', self.time_inc) - def test_cast_create_volume(self): + @mock.patch('time.time', side_effect=time_inc) + def test_cast_create_volume(self, mock_time): props = {} spec = {'volume_id': None, @@ -101,10 +53,11 @@ class CreateVolumeFlowTestCase(test.TestCase): 'consistencygroup_id': None, 'cgsnapshot_id': None} + # Fake objects assert specs task = create_volume.VolumeCastTask( - fake_scheduler_rpc_api(spec, self), - fake_volume_api(spec, self), - fake_db()) + fake_volume_api.FakeSchedulerRpcAPI(spec, self), + fake_volume_api.FakeVolumeAPI(spec, self), + fake_volume_api.FakeDb()) task._cast_create_volume(self.ctxt, spec, props) @@ -116,10 +69,11 @@ class CreateVolumeFlowTestCase(test.TestCase): 'consistencygroup_id': 5, 'cgsnapshot_id': None} + # Fake objects assert specs task = create_volume.VolumeCastTask( - fake_scheduler_rpc_api(spec, self), - fake_volume_api(spec, self), - fake_db()) + fake_volume_api.FakeSchedulerRpcAPI(spec, self), + fake_volume_api.FakeVolumeAPI(spec, self), + fake_volume_api.FakeDb()) task._cast_create_volume(self.ctxt, spec, props) diff --git a/cinder/tests/unit/volume/flows/test_manage_volume_flow.py b/cinder/tests/unit/volume/flows/test_manage_volume_flow.py new file mode 100644 index 000000000..be32af933 --- /dev/null +++ b/cinder/tests/unit/volume/flows/test_manage_volume_flow.py @@ -0,0 +1,70 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" Tests for manage_existing TaskFlow """ + +import mock + +from cinder import context +from cinder import test +from cinder.tests.unit.volume.flows import fake_volume_api +from cinder.volume.flows.api import manage_existing + + +class ManageVolumeFlowTestCase(test.TestCase): + + def setUp(self): + super(ManageVolumeFlowTestCase, self).setUp() + self.ctxt = context.get_admin_context() + self.counter = float(0) + + def test_cast_manage_existing(self): + + volume = mock.MagicMock(return_value=None) + spec = { + 'name': 'name', + 'description': 'description', + 'host': 'host', + 'ref': 'ref', + 'volume_type': 'volume_type', + 'metadata': 'metadata', + 'availability_zone': 'availability_zone', + 'bootable': 'bootable'} + + # Fake objects assert specs + task = manage_existing.ManageCastTask( + fake_volume_api.FakeSchedulerRpcAPI(spec, self), + fake_volume_api.FakeDb()) + + create_what = spec.copy() + create_what.update({'volume': volume}) + task.execute(self.ctxt, **create_what) + + volume = mock.MagicMock(return_value={'id': 1}) + + spec = { + 'name': 'name', + 'description': 'description', + 'host': 'host', + 'ref': 'ref', + 'volume_type': 'volume_type', + 'metadata': 'metadata', + 'availability_zone': 'availability_zone', + 'bootable': 'bootable'} + + # Fake objects assert specs + task = manage_existing.ManageCastTask( + fake_volume_api.FakeSchedulerRpcAPI(spec, self), + fake_volume_api.FakeDb()) + + create_what = spec.copy() + create_what.update({'volume': volume}) + task.execute(self.ctxt, **create_what) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 11abff87e..ebb1511fc 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -45,6 +45,7 @@ from cinder import quota_utils from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import utils from cinder.volume.flows.api import create_volume +from cinder.volume.flows.api import manage_existing from cinder.volume import qos_specs from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import utils as volume_utils @@ -1451,36 +1452,36 @@ class API(base.Base): LOG.error(_LE('Unable to find service for given host.')) availability_zone = service.get('availability_zone') - volume_type_id = volume_type['id'] if volume_type else None - volume_properties = { - 'size': 0, - 'user_id': context.user_id, - 'project_id': context.project_id, - 'status': 'creating', - 'attach_status': 'detached', - # Rename these to the internal name. - 'display_description': description, - 'display_name': name, + manage_what = { + 'context': context, + 'name': name, + 'description': description, 'host': host, - 'availability_zone': availability_zone, - 'volume_type_id': volume_type_id, + 'ref': ref, + 'volume_type': volume_type, 'metadata': metadata, - 'bootable': bootable + 'availability_zone': availability_zone, + 'bootable': bootable, } - # Call the scheduler to ensure that the host exists and that it can - # accept the volume - volume = self.db.volume_create(context, volume_properties) - request_spec = {'volume_properties': volume, - 'volume_type': volume_type, - 'volume_id': volume['id'], - 'ref': ref} - self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic, - volume['id'], - request_spec=request_spec) - LOG.info(_LI("Manage volume request issued successfully."), - resource=volume) - return volume + try: + flow_engine = manage_existing.get_flow(self.scheduler_rpcapi, + self.db, + manage_what) + except Exception: + msg = _('Failed to manage api volume flow.') + LOG.exception(msg) + raise exception.CinderException(msg) + + # Attaching this listener will capture all of the notifications that + # taskflow sends out and redirect them to a more useful log for + # cinder's debugging (or error reporting) usage. + with flow_utils.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() + vol_ref = flow_engine.storage.fetch('volume') + LOG.info(_LI("Manage volume request issued successfully."), + resource=vol_ref) + return vol_ref class HostAPI(base.Base): diff --git a/cinder/volume/flows/api/manage_existing.py b/cinder/volume/flows/api/manage_existing.py new file mode 100644 index 000000000..6b9017bd8 --- /dev/null +++ b/cinder/volume/flows/api/manage_existing.py @@ -0,0 +1,153 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_config import cfg +from oslo_log import log as logging +import taskflow.engines +from taskflow.patterns import linear_flow +from taskflow.types import failure as ft + +from cinder import exception +from cinder import flow_utils +from cinder.i18n import _LE +from cinder.volume.flows import common + +LOG = logging.getLogger(__name__) + +ACTION = 'volume:manage_existing' +CONF = cfg.CONF + + +class EntryCreateTask(flow_utils.CinderTask): + """Creates an entry for the given volume creation in the database. + + Reversion strategy: remove the volume_id created from the database. + """ + default_provides = set(['volume_properties', 'volume']) + + def __init__(self, db): + requires = ['availability_zone', 'description', 'metadata', + 'name', 'host', 'bootable', 'volume_type', 'ref'] + super(EntryCreateTask, self).__init__(addons=[ACTION], + requires=requires) + self.db = db + + def execute(self, context, **kwargs): + """Creates a database entry for the given inputs and returns details. + + Accesses the database and creates a new entry for the to be created + volume using the given volume properties which are extracted from the + input kwargs. + """ + volume_type = kwargs.pop('volume_type') + volume_type_id = volume_type['id'] if volume_type else None + + volume_properties = { + 'size': 0, + 'user_id': context.user_id, + 'project_id': context.project_id, + 'status': 'creating', + 'attach_status': 'detached', + # Rename these to the internal name. + 'display_description': kwargs.pop('description'), + 'display_name': kwargs.pop('name'), + 'host': kwargs.pop('host'), + 'availability_zone': kwargs.pop('availability_zone'), + 'volume_type_id': volume_type_id, + 'metadata': kwargs.pop('metadata'), + 'bootable': kwargs.pop('bootable'), + } + + volume = self.db.volume_create(context, volume_properties) + + return { + 'volume_properties': volume_properties, + # NOTE(harlowja): it appears like further usage of this volume + # result actually depend on it being a sqlalchemy object and not + # just a plain dictionary so that's why we are storing this here. + # + # In the future where this task results can be serialized and + # restored automatically for continued running we will need to + # resolve the serialization & recreation of this object since raw + # sqlalchemy objects can't be serialized. + 'volume': volume, + } + + def revert(self, context, result, optional_args, **kwargs): + # We never produced a result and therefore can't destroy anything. + if isinstance(result, ft.Failure): + return + + vol_id = result['volume_id'] + try: + self.db.volume_destroy(context.elevated(), vol_id) + except exception.CinderException: + LOG.exception(_LE("Failed destroying volume entry: %s."), vol_id) + + +class ManageCastTask(flow_utils.CinderTask): + """Performs a volume manage cast to the scheduler and to the volume manager. + + This which will signal a transition of the api workflow to another child + and/or related workflow. + """ + + def __init__(self, scheduler_rpcapi, db): + requires = ['volume', 'volume_properties', 'volume_type', 'ref'] + super(ManageCastTask, self).__init__(addons=[ACTION], + requires=requires) + self.scheduler_rpcapi = scheduler_rpcapi + self.db = db + + def execute(self, context, **kwargs): + volume = kwargs.pop('volume') + request_spec = kwargs.copy() + + # Call the scheduler to ensure that the host exists and that it can + # accept the volume + self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic, + volume['id'], + request_spec=request_spec) + + def revert(self, context, result, flow_failures, **kwargs): + # Restore the source volume status and set the volume to error status. + volume_id = kwargs['volume_id'] + common.error_out_volume(context, self.db, volume_id) + LOG.error(_LE("Volume %s: manage failed."), volume_id) + exc_info = False + if all(flow_failures[-1].exc_info): + exc_info = flow_failures[-1].exc_info + LOG.error(_LE('Unexpected build error:'), exc_info=exc_info) + + +def get_flow(scheduler_rpcapi, db_api, create_what): + """Constructs and returns the api entrypoint flow. + + This flow will do the following: + + 1. Inject keys & values for dependent tasks. + 2. Extracts and validates the input keys & values. + 3. Creates the database entry. + 4. Casts to volume manager and scheduler for further processing. + """ + + flow_name = ACTION.replace(":", "_") + "_api" + api_flow = linear_flow.Flow(flow_name) + + # This will cast it out to either the scheduler or volume manager via + # the rpc apis provided. + api_flow.add(EntryCreateTask(db_api), + ManageCastTask(scheduler_rpcapi, db_api)) + + # Now load (but do not run) the flow using the provided initial data. + return taskflow.engines.load(api_flow, store=create_what) diff --git a/tox.ini b/tox.ini index ce92a0bc5..3a9653657 100644 --- a/tox.ini +++ b/tox.ini @@ -49,7 +49,6 @@ commands = cinder.tests.unit.test_cmd \ cinder.tests.unit.test_conf \ cinder.tests.unit.test_context \ - cinder.tests.unit.test_create_volume_flow \ cinder.tests.unit.test_db_api \ cinder.tests.unit.test_dellfc \ cinder.tests.unit.test_dellsc \ @@ -93,7 +92,8 @@ commands = cinder.tests.unit.test_volume_rpcapi \ cinder.tests.unit.test_volume_types \ cinder.tests.unit.test_volume_types_extra_specs \ - cinder.tests.unit.test_volume_utils + cinder.tests.unit.test_volume_utils \ + cinder.tests.unit.volume.flows.test_create_volume_flow [testenv:pep8] commands =