# under the License.
"""Unit tests for the GlusterFS driver module."""
+import contextlib
import errno
+import mock
import os
import tempfile
mox.VerifyAll()
+ @mock.patch('os.remove')
+ @mock.patch('os.path.exists')
+ def test_delete_volume_with_info_file(self, mock_path_exists, mock_remove):
+ mock_path_exists.return_value = True
+ info_file = self.TEST_LOCAL_PATH + '.info'
+ volume = self._simple_volume()
+
+ with contextlib.nested(
+ mock.patch.object(self._driver, '_ensure_share_mounted'),
+ mock.patch.object(self._driver, 'local_path'),
+ mock.patch.object(self._driver, '_execute')
+ ) as (mock_ensure_share_mounted, mock_local_path, mock_execute):
+ mock_local_path.return_value = self.TEST_LOCAL_PATH
+
+ self._driver.delete_volume(volume)
+
+ mock_ensure_share_mounted.assert_called_once_with(
+ volume['provider_location'])
+ mock_local_path.assert_called_once_with(volume)
+ mock_execute.assert_called_once_with('rm', '-f',
+ self.TEST_LOCAL_PATH,
+ run_as_root=True)
+ mock_path_exists.assert_called_once_with(info_file)
+ mock_remove.assert_called_once_with(info_file)
+
def test_create_snapshot(self):
(mox, drv) = self._mox, self._driver
self._execute('rm', '-f', mounted_path, run_as_root=True)
+ info_path = mounted_path + '.info'
+ if os.path.exists(info_path):
+ os.remove(info_path)
+
@utils.synchronized('glusterfs', external=False)
def create_snapshot(self, snapshot):
"""Apply locking to the create snapshot operation."""