]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Tests: Split VolumeTestCase into separate classes
authorEric Harney <eharney@redhat.com>
Mon, 13 Jul 2015 15:20:56 +0000 (11:20 -0400)
committerEric Harney <eharney@redhat.com>
Tue, 15 Sep 2015 19:56:46 +0000 (15:56 -0400)
Organize the large VolumeTestCase class into a few
subclasses.

Change-Id: I1538445533bc6c14668177443cbc8f676731896d

cinder/tests/unit/test_volume.py

index a96ee8ef7faef019cd6638500bf4ac1eb9bb0874..0bd76f6c7cb74ed4542f0badfe923f84d6a6061d 100644 (file)
@@ -225,6 +225,32 @@ class AvailabilityZoneTestCase(BaseVolumeTestCase):
             self.assertGreater(volume_api.availability_zones_last_fetched,
                                last_fetched)
 
+    def test_list_availability_zones_enabled_service(self):
+        services = [
+            {'availability_zone': 'ping', 'disabled': 0},
+            {'availability_zone': 'ping', 'disabled': 1},
+            {'availability_zone': 'pong', 'disabled': 0},
+            {'availability_zone': 'pung', 'disabled': 1},
+        ]
+
+        def stub_service_get_all_by_topic(*args, **kwargs):
+            return services
+
+        self.stubs.Set(db, 'service_get_all_by_topic',
+                       stub_service_get_all_by_topic)
+
+        volume_api = cinder.volume.api.API()
+        azs = volume_api.list_availability_zones()
+        azs = list(azs).sort()
+
+        expected = [
+            {'name': 'pung', 'available': False},
+            {'name': 'pong', 'available': True},
+            {'name': 'ping', 'available': True},
+        ].sort()
+
+        self.assertEqual(expected, azs)
+
 
 class VolumeTestCase(BaseVolumeTestCase):
 
@@ -3243,17 +3269,6 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.volume.delete_snapshot(self.context, snapshot)
         self.volume.delete_volume(self.context, volume['id'])
 
-    def test_delete_volume_in_consistency_group(self):
-        """Test deleting a volume that's tied to a consistency group fails."""
-        volume_api = cinder.volume.api.API()
-        volume = tests_utils.create_volume(self.context, **self.volume_params)
-        consistencygroup_id = '12345678-1234-5678-1234-567812345678'
-        volume = db.volume_update(self.context, volume['id'],
-                                  {'status': 'available',
-                                   'consistencygroup_id': consistencygroup_id})
-        self.assertRaises(exception.InvalidVolume,
-                          volume_api.delete, self.context, volume)
-
     def test_can_delete_errored_snapshot(self):
         """Test snapshot can be created and deleted."""
         volume = tests_utils.create_volume(self.context, **self.volume_params)
@@ -4115,139 +4130,134 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.volume.delete_volume(self.context, volume_dst['id'])
         self.volume.delete_volume(self.context, volume_src['id'])
 
-    @mock.patch('cinder.db.volume_update')
-    def test_update_migrated_volume(self, volume_update):
-        fake_host = 'fake_host'
-        fake_new_host = 'fake_new_host'
-        fake_update = {'_name_id': 'updated_id',
-                       'provider_location': 'updated_location'}
-        fake_elevated = 'fake_elevated'
-        volume = tests_utils.create_volume(self.context, size=1,
-                                           status='available',
-                                           host=fake_host)
-        new_volume = tests_utils.create_volume(
-            self.context, size=1,
-            status='available',
-            provider_location='fake_provider_location',
-            _name_id='fake_name_id',
-            host=fake_new_host)
-        fake_update_error = {'_name_id': new_volume['_name_id'],
-                             'provider_location':
-                             new_volume['provider_location']}
-        expected_update = {'_name_id': volume['_name_id'],
-                           'provider_location': volume['provider_location']}
-        with mock.patch.object(self.volume.driver,
-                               'update_migrated_volume') as \
-                migrate_update,\
-                mock.patch.object(self.context, 'elevated') as elevated:
-            migrate_update.return_value = fake_update
-            elevated.return_value = fake_elevated
-            self.volume.update_migrated_volume(self.context, volume,
-                                               new_volume, 'available')
-            volume_update.assert_has_calls((
-                mock.call(fake_elevated, volume['id'], fake_update),
-                mock.call(fake_elevated, new_volume['id'], expected_update)))
+    def test_clean_temporary_volume(self):
+        def fake_delete_volume(ctxt, volume):
+            db.volume_destroy(ctxt, volume['id'])
 
-            # Test the case for update_migrated_volume not implemented
-            # for the driver.
-            migrate_update.reset_mock()
-            volume_update.reset_mock()
-            migrate_update.side_effect = NotImplementedError
-            self.volume.update_migrated_volume(self.context, volume,
-                                               new_volume, 'available')
-            volume_update.assert_has_calls((
-                mock.call(fake_elevated, volume['id'], fake_update_error),
-                mock.call(fake_elevated, new_volume['id'], expected_update)))
+        fake_volume = tests_utils.create_volume(self.context, size=1,
+                                                host=CONF.host)
+        fake_new_volume = tests_utils.create_volume(self.context, size=1,
+                                                    host=CONF.host)
+        # Check when the migrated volume is in migration
+        db.volume_update(self.context, fake_volume['id'],
+                         {'migration_status': 'migrating'})
+        # 1. Only clean the db
+        self.volume._clean_temporary_volume(self.context, fake_volume['id'],
+                                            fake_new_volume['id'],
+                                            clean_db_only=True)
+        self.assertRaises(exception.VolumeNotFound,
+                          db.volume_get, self.context,
+                          fake_new_volume['id'])
 
-    def test_list_availability_zones_enabled_service(self):
-        services = [
-            {'availability_zone': 'ping', 'disabled': 0},
-            {'availability_zone': 'ping', 'disabled': 1},
-            {'availability_zone': 'pong', 'disabled': 0},
-            {'availability_zone': 'pung', 'disabled': 1},
-        ]
+        # 2. Delete the backend storage
+        fake_new_volume = tests_utils.create_volume(self.context, size=1,
+                                                    host=CONF.host)
+        with mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume') as \
+                mock_delete_volume:
+            mock_delete_volume.side_effect = fake_delete_volume
+            self.volume._clean_temporary_volume(self.context,
+                                                fake_volume['id'],
+                                                fake_new_volume['id'],
+                                                clean_db_only=False)
+            self.assertRaises(exception.VolumeNotFound,
+                              db.volume_get, self.context,
+                              fake_new_volume['id'])
 
-        def stub_service_get_all_by_topic(*args, **kwargs):
-            return services
+        # Check when the migrated volume is not in migration
+        fake_new_volume = tests_utils.create_volume(self.context, size=1,
+                                                    host=CONF.host)
+        db.volume_update(self.context, fake_volume['id'],
+                         {'migration_status': 'non-migrating'})
+        self.volume._clean_temporary_volume(self.context, fake_volume['id'],
+                                            fake_new_volume['id'])
+        volume = db.volume_get(context.get_admin_context(),
+                               fake_new_volume['id'])
+        self.assertIsNone(volume['migration_status'])
 
-        self.stubs.Set(db, 'service_get_all_by_topic',
-                       stub_service_get_all_by_topic)
+    def test_update_volume_readonly_flag(self):
+        """Test volume readonly flag can be updated at API level."""
+        # create a volume and assign to host
+        volume = tests_utils.create_volume(self.context,
+                                           admin_metadata={'readonly': 'True'},
+                                           **self.volume_params)
+        self.volume.create_volume(self.context, volume['id'])
+        volume['status'] = 'in-use'
 
         volume_api = cinder.volume.api.API()
-        azs = volume_api.list_availability_zones()
-        azs = list(azs).sort()
 
-        expected = [
-            {'name': 'pung', 'available': False},
-            {'name': 'pong', 'available': True},
-            {'name': 'ping', 'available': True},
-        ].sort()
-
-        self.assertEqual(expected, azs)
-
-    @mock.patch.object(utils, 'brick_get_connector_properties')
-    @mock.patch.object(cinder.volume.manager.VolumeManager, '_attach_volume')
-    @mock.patch.object(cinder.volume.manager.VolumeManager, '_detach_volume')
-    @mock.patch.object(volutils, 'copy_volume')
-    @mock.patch.object(volume_rpcapi.VolumeAPI, 'get_capabilities')
-    def test_copy_volume_data(self,
-                              mock_get_capabilities,
-                              mock_copy,
-                              mock_detach,
-                              mock_attach,
-                              mock_get_connector):
-        """Test function of _copy_volume_data."""
+        # Update fails when status != available
+        self.assertRaises(exception.InvalidVolume,
+                          volume_api.update_readonly_flag,
+                          self.context,
+                          volume,
+                          False)
 
-        src_vol = tests_utils.create_volume(self.context, size=1,
-                                            host=CONF.host)
-        dest_vol = tests_utils.create_volume(self.context, size=1,
-                                             host=CONF.host)
-        mock_get_connector.return_value = {}
-        self.volume.driver._throttle = mock.MagicMock()
+        volume['status'] = 'available'
 
-        attach_expected = [
-            mock.call(self.context, dest_vol, {}, remote=False),
-            mock.call(self.context, src_vol, {}, remote=False)]
+        # works when volume in 'available' status
+        volume_api.update_readonly_flag(self.context, volume, False)
 
-        detach_expected = [
-            mock.call(self.context, {'device': {'path': 'bar'}},
-                      dest_vol, {}, force=False, remote=False),
-            mock.call(self.context, {'device': {'path': 'foo'}},
-                      src_vol, {}, force=False, remote=False)]
+        volume = db.volume_get(context.get_admin_context(), volume['id'])
+        self.assertEqual('available', volume['status'])
+        admin_metadata = volume['volume_admin_metadata']
+        self.assertEqual(1, len(admin_metadata))
+        self.assertEqual('readonly', admin_metadata[0]['key'])
+        self.assertEqual('False', admin_metadata[0]['value'])
 
-        attach_volume_returns = [
-            {'device': {'path': 'bar'}},
-            {'device': {'path': 'foo'}}
-        ]
+        # clean up
+        self.volume.delete_volume(self.context, volume['id'])
 
-        #  Test case for sparse_copy_volume = False
-        mock_attach.side_effect = attach_volume_returns
-        mock_get_capabilities.return_value = {}
-        self.volume._copy_volume_data(self.context,
-                                      src_vol,
-                                      dest_vol)
+    def test_secure_file_operations_enabled(self):
+        """Test secure file operations setting for base driver.
 
-        self.assertEqual(attach_expected, mock_attach.mock_calls)
-        mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=False)
-        self.assertEqual(detach_expected, mock_detach.mock_calls)
+        General, non network file system based drivers do not have
+        anything to do with "secure_file_operations". This test verifies that
+        calling the method always returns False.
+        """
+        ret_flag = self.volume.driver.secure_file_operations_enabled()
+        self.assertFalse(ret_flag)
 
-        #  Test case for sparse_copy_volume = True
-        mock_attach.reset_mock()
-        mock_detach.reset_mock()
-        mock_attach.side_effect = attach_volume_returns
-        mock_get_capabilities.return_value = {'sparse_copy_volume': True}
-        self.volume._copy_volume_data(self.context,
-                                      src_vol,
-                                      dest_vol)
+    @mock.patch('cinder.volume.flows.common.make_pretty_name',
+                new=mock.MagicMock())
+    @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.create_volume',
+                return_value=None)
+    @mock.patch('cinder.volume.flows.manager.create_volume.'
+                'CreateVolumeFromSpecTask.execute',
+                side_effect=exception.DriverNotInitialized())
+    def test_create_volume_raise_rescheduled_exception(self, mock_execute,
+                                                       mock_reschedule):
+        # Create source volume
+        test_vol = tests_utils.create_volume(self.context,
+                                             **self.volume_params)
+        test_vol_id = test_vol['id']
+        self.assertRaises(exception.DriverNotInitialized,
+                          self.volume.create_volume,
+                          self.context, test_vol_id,
+                          {'volume_properties': self.volume_params},
+                          {'retry': {'num_attempts': 1, 'host': []}})
+        self.assertTrue(mock_reschedule.called)
+        volume = db.volume_get(context.get_admin_context(), test_vol_id)
+        self.assertEqual('creating', volume['status'])
 
-        self.assertEqual(attach_expected, mock_attach.mock_calls)
-        mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=True)
-        self.assertEqual(detach_expected, mock_detach.mock_calls)
+    @mock.patch('cinder.volume.flows.manager.create_volume.'
+                'CreateVolumeFromSpecTask.execute')
+    def test_create_volume_raise_unrescheduled_exception(self, mock_execute):
+        # create source volume
+        test_vol = tests_utils.create_volume(self.context,
+                                             **self.volume_params)
+        test_vol_id = test_vol['id']
+        mock_execute.side_effect = exception.VolumeNotFound(
+            volume_id=test_vol_id)
+        self.assertRaises(exception.VolumeNotFound,
+                          self.volume.create_volume,
+                          self.context, test_vol_id,
+                          {'volume_properties': self.volume_params},
+                          {'retry': {'num_attempts': 1, 'host': []}})
+        volume = db.volume_get(context.get_admin_context(), test_vol_id)
+        self.assertEqual('error', volume['status'])
 
-        # cleanup resource
-        db.volume_destroy(self.context, src_vol['id'])
-        db.volume_destroy(self.context, dest_vol['id'])
 
+class VolumeMigrationTestCase(VolumeTestCase):
     def test_migrate_volume_driver(self):
         """Test volume migration done by driver."""
         # stub out driver and rpc functions
@@ -4404,52 +4414,54 @@ class VolumeTestCase(BaseVolumeTestCase):
             self.assertEqual('error', volume['migration_status'])
             self.assertEqual('available', volume['status'])
 
-    def test_clean_temporary_volume(self):
-        def fake_delete_volume(ctxt, volume):
-            db.volume_destroy(ctxt, volume['id'])
-
-        fake_volume = tests_utils.create_volume(self.context, size=1,
-                                                host=CONF.host)
-        fake_new_volume = tests_utils.create_volume(self.context, size=1,
-                                                    host=CONF.host)
-        # Check when the migrated volume is in migration
-        db.volume_update(self.context, fake_volume['id'],
-                         {'migration_status': 'migrating'})
-        # 1. Only clean the db
-        self.volume._clean_temporary_volume(self.context, fake_volume['id'],
-                                            fake_new_volume['id'],
-                                            clean_db_only=True)
-        self.assertRaises(exception.VolumeNotFound,
-                          db.volume_get, self.context,
-                          fake_new_volume['id'])
-
-        # 2. Delete the backend storage
-        fake_new_volume = tests_utils.create_volume(self.context, size=1,
-                                                    host=CONF.host)
-        with mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume') as \
-                mock_delete_volume:
-            mock_delete_volume.side_effect = fake_delete_volume
-            self.volume._clean_temporary_volume(self.context,
-                                                fake_volume['id'],
-                                                fake_new_volume['id'],
-                                                clean_db_only=False)
-            self.assertRaises(exception.VolumeNotFound,
-                              db.volume_get, self.context,
-                              fake_new_volume['id'])
+    @mock.patch('cinder.db.volume_update')
+    def test_update_migrated_volume(self, volume_update):
+        fake_host = 'fake_host'
+        fake_new_host = 'fake_new_host'
+        fake_update = {'_name_id': 'updated_id',
+                       'provider_location': 'updated_location'}
+        fake_elevated = 'fake_elevated'
+        volume = tests_utils.create_volume(self.context, size=1,
+                                           status='available',
+                                           host=fake_host)
+        new_volume = tests_utils.create_volume(
+            self.context, size=1,
+            status='available',
+            provider_location='fake_provider_location',
+            _name_id='fake_name_id',
+            host=fake_new_host)
+        new_volume['_name_id'] = 'fake_name_id'
+        new_volume['provider_location'] = 'fake_provider_location'
+        fake_update_error = {'_name_id': new_volume['_name_id'],
+                             'provider_location':
+                             new_volume['provider_location']}
+        expected_update = {'_name_id': volume['_name_id'],
+                           'provider_location': volume['provider_location']}
+        with mock.patch.object(self.volume.driver,
+                               'update_migrated_volume') as migrate_update,\
+                mock.patch.object(self.context, 'elevated') as elevated:
+            migrate_update.return_value = fake_update
+            elevated.return_value = fake_elevated
+            self.volume.update_migrated_volume(self.context, volume,
+                                               new_volume, 'available')
+            volume_update.assert_has_calls((
+                mock.call(fake_elevated, volume['id'], fake_update),
+                mock.call(fake_elevated, new_volume['id'], expected_update)))
 
-        # Check when the migrated volume is not in migration
-        fake_new_volume = tests_utils.create_volume(self.context, size=1,
-                                                    host=CONF.host)
-        db.volume_update(self.context, fake_volume['id'],
-                         {'migration_status': 'non-migrating'})
-        self.volume._clean_temporary_volume(self.context, fake_volume['id'],
-                                            fake_new_volume['id'])
-        volume = db.volume_get(context.get_admin_context(),
-                               fake_new_volume['id'])
-        self.assertIsNone(volume['migration_status'])
+            # Test the case for update_migrated_volume not implemented
+            # for the driver.
+            migrate_update.reset_mock()
+            volume_update.reset_mock()
+            migrate_update.side_effect = NotImplementedError
+            self.volume.update_migrated_volume(self.context, volume,
+                                               new_volume, 'available')
+            volume_update.assert_has_calls((
+                mock.call(fake_elevated, volume['id'], fake_update_error),
+                mock.call(fake_elevated, new_volume['id'], expected_update)))
 
     def test_migrate_volume_generic_create_volume_error(self):
         self.expected_status = 'error'
+
         with mock.patch.object(self.volume.driver, 'migrate_volume'), \
                 mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume') as \
                 mock_create_volume, \
@@ -4807,38 +4819,18 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.volume.driver._initialized = True
         self.volume.delete_volume(self.context, volume['id'])
 
-    def test_update_volume_readonly_flag(self):
-        """Test volume readonly flag can be updated at API level."""
-        # create a volume and assign to host
-        volume = tests_utils.create_volume(self.context,
-                                           admin_metadata={'readonly': 'True'},
-                                           **self.volume_params)
-        self.volume.create_volume(self.context, volume['id'])
-        volume['status'] = 'in-use'
 
+class ConsistencyGroupTestCase(BaseVolumeTestCase):
+    def test_delete_volume_in_consistency_group(self):
+        """Test deleting a volume that's tied to a consistency group fails."""
         volume_api = cinder.volume.api.API()
-
-        # Update fails when status != available
+        volume = tests_utils.create_volume(self.context, **self.volume_params)
+        consistencygroup_id = '12345678-1234-5678-1234-567812345678'
+        volume = db.volume_update(self.context, volume['id'],
+                                  {'status': 'available',
+                                   'consistencygroup_id': consistencygroup_id})
         self.assertRaises(exception.InvalidVolume,
-                          volume_api.update_readonly_flag,
-                          self.context,
-                          volume,
-                          False)
-
-        volume['status'] = 'available'
-
-        # works when volume in 'available' status
-        volume_api.update_readonly_flag(self.context, volume, False)
-
-        volume = db.volume_get(context.get_admin_context(), volume['id'])
-        self.assertEqual('available', volume['status'])
-        admin_metadata = volume['volume_admin_metadata']
-        self.assertEqual(1, len(admin_metadata))
-        self.assertEqual('readonly', admin_metadata[0]['key'])
-        self.assertEqual('False', admin_metadata[0]['value'])
-
-        # clean up
-        self.volume.delete_volume(self.context, volume['id'])
+                          volume_api.delete, self.context, volume)
 
     @mock.patch.object(CGQUOTAS, "reserve",
                        return_value=["RESERVATION"])
@@ -5421,55 +5413,6 @@ class VolumeTestCase(BaseVolumeTestCase):
         # Group is not deleted
         self.assertEqual('available', cg.status)
 
-    def test_secure_file_operations_enabled(self):
-        """Test secure file operations setting for base driver.
-
-        General, non network file system based drivers do not have
-        anything to do with "secure_file_operations". This test verifies that
-        calling the method always returns False.
-        """
-        ret_flag = self.volume.driver.secure_file_operations_enabled()
-        self.assertFalse(ret_flag)
-
-    @mock.patch('cinder.volume.flows.common.make_pretty_name',
-                new=mock.MagicMock())
-    @mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.create_volume',
-                return_value=None)
-    @mock.patch('cinder.volume.flows.manager.create_volume.'
-                'CreateVolumeFromSpecTask.execute',
-                side_effect=exception.DriverNotInitialized())
-    def test_create_volume_raise_rescheduled_exception(self, mock_execute,
-                                                       mock_reschedule):
-        # Create source volume
-        test_vol = tests_utils.create_volume(self.context,
-                                             **self.volume_params)
-        test_vol_id = test_vol['id']
-        self.assertRaises(exception.DriverNotInitialized,
-                          self.volume.create_volume,
-                          self.context, test_vol_id,
-                          {'volume_properties': self.volume_params},
-                          {'retry': {'num_attempts': 1, 'host': []}})
-        self.assertTrue(mock_reschedule.called)
-        volume = db.volume_get(context.get_admin_context(), test_vol_id)
-        self.assertEqual('creating', volume['status'])
-
-    @mock.patch('cinder.volume.flows.manager.create_volume.'
-                'CreateVolumeFromSpecTask.execute')
-    def test_create_volume_raise_unrescheduled_exception(self, mock_execute):
-        # create source volume
-        test_vol = tests_utils.create_volume(self.context,
-                                             **self.volume_params)
-        test_vol_id = test_vol['id']
-        mock_execute.side_effect = exception.VolumeNotFound(
-            volume_id=test_vol_id)
-        self.assertRaises(exception.VolumeNotFound,
-                          self.volume.create_volume,
-                          self.context, test_vol_id,
-                          {'volume_properties': self.volume_params},
-                          {'retry': {'num_attempts': 1, 'host': []}})
-        volume = db.volume_get(context.get_admin_context(), test_vol_id)
-        self.assertEqual('error', volume['status'])
-
     def test_create_volume_with_consistencygroup_invalid_type(self):
         """Test volume creation with ConsistencyGroup & invalid volume type."""
         vol_type = db.volume_type_create(
@@ -6405,6 +6348,69 @@ class GenericVolumeDriverTestCase(DriverTestCase):
         db.volume_destroy(self.context, src_vol['id'])
         db.volume_destroy(self.context, dest_vol['id'])
 
+    @mock.patch.object(utils, 'brick_get_connector_properties')
+    @mock.patch.object(cinder.volume.manager.VolumeManager, '_attach_volume')
+    @mock.patch.object(cinder.volume.manager.VolumeManager, '_detach_volume')
+    @mock.patch.object(volutils, 'copy_volume')
+    @mock.patch.object(volume_rpcapi.VolumeAPI, 'get_capabilities')
+    def test_copy_volume_data_mgr(self,
+                                  mock_get_capabilities,
+                                  mock_copy,
+                                  mock_detach,
+                                  mock_attach,
+                                  mock_get_connector):
+        """Test function of _copy_volume_data."""
+
+        src_vol = tests_utils.create_volume(self.context, size=1,
+                                            host=CONF.host)
+        dest_vol = tests_utils.create_volume(self.context, size=1,
+                                             host=CONF.host)
+        mock_get_connector.return_value = {}
+        self.volume.driver._throttle = mock.MagicMock()
+
+        attach_expected = [
+            mock.call(self.context, dest_vol, {}, remote=False),
+            mock.call(self.context, src_vol, {}, remote=False)]
+
+        detach_expected = [
+            mock.call(self.context, {'device': {'path': 'bar'}},
+                      dest_vol, {}, force=False, remote=False),
+            mock.call(self.context, {'device': {'path': 'foo'}},
+                      src_vol, {}, force=False, remote=False)]
+
+        attach_volume_returns = [
+            {'device': {'path': 'bar'}},
+            {'device': {'path': 'foo'}}
+        ]
+
+        #  Test case for sparse_copy_volume = False
+        mock_attach.side_effect = attach_volume_returns
+        mock_get_capabilities.return_value = {}
+        self.volume._copy_volume_data(self.context,
+                                      src_vol,
+                                      dest_vol)
+
+        self.assertEqual(attach_expected, mock_attach.mock_calls)
+        mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=False)
+        self.assertEqual(detach_expected, mock_detach.mock_calls)
+
+        #  Test case for sparse_copy_volume = True
+        mock_attach.reset_mock()
+        mock_detach.reset_mock()
+        mock_attach.side_effect = attach_volume_returns
+        mock_get_capabilities.return_value = {'sparse_copy_volume': True}
+        self.volume._copy_volume_data(self.context,
+                                      src_vol,
+                                      dest_vol)
+
+        self.assertEqual(attach_expected, mock_attach.mock_calls)
+        mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=True)
+        self.assertEqual(detach_expected, mock_detach.mock_calls)
+
+        # cleanup resource
+        db.volume_destroy(self.context, src_vol['id'])
+        db.volume_destroy(self.context, dest_vol['id'])
+
 
 class LVMISCSIVolumeDriverTestCase(DriverTestCase):
     """Test case for VolumeDriver"""