# under the License.
""" Tests for manage_existing TaskFlow """
-import mock
-
from cinder import context
from cinder import test
+from cinder.tests.unit import fake_volume
from cinder.tests.unit.volume.flows import fake_volume_api
from cinder.volume.flows.api import manage_existing
self.counter = float(0)
def test_cast_manage_existing(self):
-
- volume = mock.MagicMock(return_value=None)
- spec = {
- 'name': 'name',
- 'description': 'description',
- 'host': 'host',
- 'ref': 'ref',
- 'volume_type': 'volume_type',
- 'metadata': 'metadata',
- 'availability_zone': 'availability_zone',
- 'bootable': 'bootable'}
-
- # Fake objects assert specs
- task = manage_existing.ManageCastTask(
- fake_volume_api.FakeSchedulerRpcAPI(spec, self),
- fake_volume_api.FakeDb())
-
- create_what = spec.copy()
- create_what.update({'volume': volume})
- task.execute(self.ctxt, **create_what)
-
- volume = mock.MagicMock(return_value={'id': 1})
+ volume = fake_volume.fake_volume_type_obj(self.ctxt)
spec = {
'name': 'name',
'volume_type': 'volume_type',
'metadata': 'metadata',
'availability_zone': 'availability_zone',
- 'bootable': 'bootable'}
+ 'bootable': 'bootable',
+ 'volume_id': volume.id,
+ }
# Fake objects assert specs
task = manage_existing.ManageCastTask(
create_what = spec.copy()
create_what.update({'volume': volume})
+ create_what.pop('volume_id')
task.execute(self.ctxt, **create_what)