Add Posix backup driver. Supports simple nas too.
Co-Authored-By: Bharat Kumar Kobagana <bharat.kobagana@redhat.com>
Partially Implements: blueprint nfs-backup
Change-Id: I99383aa23b6dda217c8df6b33561111a8823b452
# Copyright (C) 2015 Tom Barron <tpb@dyncloud.net>
+# Copyright (C) 2015 Kevin Fox <kevin@efox.cc>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
"""Implementation of a backup service that uses NFS storage as the backend."""
-import os
-import os.path
-
from os_brick.remotefs import remotefs as remotefs_brick
from oslo_config import cfg
from oslo_log import log as logging
-from cinder.backup import chunkeddriver
+from cinder.backup.drivers import posix
from cinder import exception
from cinder.i18n import _
from cinder import utils
LOG = logging.getLogger(__name__)
-SHA_SIZE = 32768
-# Multiple of SHA_SIZE, close to a characteristic OS max file system size.
-BACKUP_FILE_SIZE = 61035 * 32768
-
nfsbackup_service_opts = [
- cfg.IntOpt('backup_file_size',
- default=BACKUP_FILE_SIZE,
- help='The maximum size in bytes of the files used to hold '
- 'backups. If the volume being backed up exceeds this '
- 'size, then it will be backed up into multiple files. '
- 'backup_file_size must be a multiple of '
- 'backup_sha_block_size_bytes.'),
- cfg.IntOpt('backup_sha_block_size_bytes',
- default=SHA_SIZE,
- help='The size in bytes that changes are tracked '
- 'for incremental backups. backup_file_size '
- 'has to be multiple of backup_sha_block_size_bytes.'),
- cfg.BoolOpt('backup_enable_progress_timer',
- default=True,
- help='Enable or Disable the timer to send the periodic '
- 'progress notifications to Ceilometer when backing '
- 'up the volume to the backend storage. The '
- 'default value is True to enable the timer.'),
cfg.StrOpt('backup_mount_point_base',
default='$state_path/backup_mount',
help='Base dir containing mount point for NFS share.'),
cfg.StrOpt('backup_share',
default=None,
- help='NFS share in fqdn:path, ipv4addr:path, '
+ help='NFS share in hostname:path, ipv4addr:path, '
'or "[ipv6addr]:path" format.'),
cfg.StrOpt('backup_mount_options',
default=None,
help=('Mount options passed to the NFS client. See NFS '
'man page for details.')),
- cfg.StrOpt('backup_container',
- help='Custom container to use for backups.'),
]
CONF = cfg.CONF
CONF.register_opts(nfsbackup_service_opts)
-class NFSBackupDriver(chunkeddriver.ChunkedBackupDriver):
+class NFSBackupDriver(posix.PosixBackupDriver):
"""Provides backup, restore and delete using NFS supplied repository."""
def __init__(self, context, db_driver=None):
self._check_configuration()
- chunk_size_bytes = CONF.backup_file_size
- sha_block_size_bytes = CONF.backup_sha_block_size_bytes
- backup_default_container = CONF.backup_container
- enable_progress_timer = CONF.backup_enable_progress_timer
- super(NFSBackupDriver, self).__init__(context, chunk_size_bytes,
- sha_block_size_bytes,
- backup_default_container,
- enable_progress_timer,
- db_driver)
self.backup_mount_point_base = CONF.backup_mount_point_base
self.backup_share = CONF.backup_share
self.mount_options = CONF.backup_mount_options or {}
- self.backup_path = self._init_backup_repo_path()
- LOG.debug("Using NFS backup repository: %s", self.backup_path)
+ backup_path = self._init_backup_repo_path()
+ LOG.debug("Using NFS backup repository: %s", backup_path)
+ super(NFSBackupDriver, self).__init__(context,
+ backup_path=backup_path)
@staticmethod
def _check_configuration():
remotefsclient.mount(self.backup_share)
return remotefsclient.get_mount_point(self.backup_share)
- def update_container_name(self, backup, container):
- if container is not None:
- return container
- id = backup['id']
- return os.path.join(id[0:2], id[2:4], id)
-
- def put_container(self, container):
- path = os.path.join(self.backup_path, container)
- if not os.path.exists(path):
- os.makedirs(path)
- os.chmod(path, 0o770)
-
- def get_container_entries(self, container, prefix):
- path = os.path.join(self.backup_path, container)
- return [i for i in os.listdir(path) if i.startswith(prefix)]
-
- def get_object_writer(self, container, object_name, extra_metadata=None):
- path = os.path.join(self.backup_path, container, object_name)
- file = open(path, 'w')
- os.chmod(path, 0o660)
- return file
-
- def get_object_reader(self, container, object_name, extra_metadata=None):
- path = os.path.join(self.backup_path, container, object_name)
- return open(path, 'r')
-
- def delete_object(self, container, object_name):
- # TODO(tbarron): clean up the container path if it is empty
- path = os.path.join(self.backup_path, container, object_name)
- os.remove(path)
-
- def _generate_object_name_prefix(self, backup):
- return 'backup'
-
- def get_extra_metadata(self, backup, volume):
- return None
-
def get_backup_driver(context):
return NFSBackupDriver(context)
--- /dev/null
+# Copyright (C) 2015 Tom Barron <tpb@dyncloud.net>
+# Copyright (C) 2015 Kevin Fox <kevin@efox.cc>
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Implementation of a backup service that uses a posix filesystem as the
+ backend."""
+
+import os
+import os.path
+import stat
+
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from cinder.backup import chunkeddriver
+from cinder import exception
+
+LOG = logging.getLogger(__name__)
+
+SHA_SIZE = 32768
+# Multiple of SHA_SIZE, close to a characteristic OS max file system size.
+BACKUP_FILE_SIZE = 61035 * 32768
+
+posixbackup_service_opts = [
+ cfg.IntOpt('backup_file_size',
+ default=BACKUP_FILE_SIZE,
+ help='The maximum size in bytes of the files used to hold '
+ 'backups. If the volume being backed up exceeds this '
+ 'size, then it will be backed up into multiple files.'
+ 'backup_file_size must be a multiple of '
+ 'backup_sha_block_size_bytes.'),
+ cfg.IntOpt('backup_sha_block_size_bytes',
+ default=SHA_SIZE,
+ help='The size in bytes that changes are tracked '
+ 'for incremental backups. backup_file_size has '
+ 'to be multiple of backup_sha_block_size_bytes.'),
+ cfg.BoolOpt('backup_enable_progress_timer',
+ default=True,
+ help='Enable or Disable the timer to send the periodic '
+ 'progress notifications to Ceilometer when backing '
+ 'up the volume to the backend storage. The '
+ 'default value is True to enable the timer.'),
+ cfg.StrOpt('backup_posix_path',
+ default='$state_path/backup',
+ help='Path specifying where to store backups.'),
+ cfg.StrOpt('backup_container',
+ help='Custom directory to use for backups.'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(posixbackup_service_opts)
+
+
+class PosixBackupDriver(chunkeddriver.ChunkedBackupDriver):
+ """Provides backup, restore and delete using a Posix file system."""
+
+ def __init__(self, context, db_driver=None, backup_path=None):
+ chunk_size_bytes = CONF.backup_file_size
+ sha_block_size_bytes = CONF.backup_sha_block_size_bytes
+ backup_default_container = CONF.backup_container
+ enable_progress_timer = CONF.backup_enable_progress_timer
+ super(PosixBackupDriver, self).__init__(context, chunk_size_bytes,
+ sha_block_size_bytes,
+ backup_default_container,
+ enable_progress_timer,
+ db_driver)
+ self.backup_path = backup_path
+ if not backup_path:
+ self.backup_path = CONF.backup_posix_path
+ if not self.backup_path:
+ raise exception.ConfigNotFound(path='backup_path')
+ LOG.debug("Using backup repository: %s", self.backup_path)
+
+ def update_container_name(self, backup, container):
+ if container is not None:
+ return container
+ id = backup['id']
+ return os.path.join(id[0:2], id[2:4], id)
+
+ def put_container(self, container):
+ path = os.path.join(self.backup_path, container)
+ if not os.path.exists(path):
+ os.makedirs(path)
+ permissions = (
+ stat.S_IRUSR |
+ stat.S_IWUSR |
+ stat.S_IXUSR |
+ stat.S_IRGRP |
+ stat.S_IWGRP |
+ stat.S_IXGRP)
+ os.chmod(path, permissions)
+
+ def get_container_entries(self, container, prefix):
+ path = os.path.join(self.backup_path, container)
+ return [i for i in os.listdir(path) if i.startswith(prefix)]
+
+ def get_object_writer(self, container, object_name, extra_metadata=None):
+ path = os.path.join(self.backup_path, container, object_name)
+ f = open(path, 'w')
+ permissions = (
+ stat.S_IRUSR |
+ stat.S_IWUSR |
+ stat.S_IRGRP |
+ stat.S_IWGRP)
+ os.chmod(path, permissions)
+ return f
+
+ def get_object_reader(self, container, object_name, extra_metadata=None):
+ path = os.path.join(self.backup_path, container, object_name)
+ return open(path, 'r')
+
+ def delete_object(self, container, object_name):
+ # TODO(tbarron): clean up the container path if it is empty
+ path = os.path.join(self.backup_path, container, object_name)
+ os.remove(path)
+
+ def _generate_object_name_prefix(self, backup):
+ return 'backup'
+
+ def get_extra_metadata(self, backup, volume):
+ return None
+
+
+def get_backup_driver(context):
+ return PosixBackupDriver(context)
"""
import bz2
-import exceptions
import filecmp
import hashlib
import os
import mock
from os_brick.remotefs import remotefs as remotefs_brick
from oslo_config import cfg
-from six.moves import builtins
from cinder.backup.drivers import nfs
from cinder import context
CONF = cfg.CONF
-FAKE_BACKUP_ENABLE_PROGRESS_TIMER = True
FAKE_BACKUP_MOUNT_POINT_BASE = '/fake/mount-point-base'
FAKE_HOST = 'fake_host'
FAKE_EXPORT_PATH = 'fake/export/path'
FAKE_BACKUP_SHARE = '%s:/%s' % (FAKE_HOST, FAKE_EXPORT_PATH)
FAKE_BACKUP_PATH = os.path.join(FAKE_BACKUP_MOUNT_POINT_BASE,
FAKE_EXPORT_PATH)
-FAKE_BACKUP_MOUNT_OPTIONS = 'fake_opt1=fake_value1,fake_opt2=fake_value2'
-FAKE_CONTAINER = 'fake/container'
FAKE_BACKUP_ID_PART1 = 'de'
FAKE_BACKUP_ID_PART2 = 'ad'
FAKE_BACKUP_ID_REST = 'beef-whatever'
FAKE_BACKUP_ID = (FAKE_BACKUP_ID_PART1 + FAKE_BACKUP_ID_PART2 +
FAKE_BACKUP_ID_REST)
-FAKE_BACKUP = {'id': FAKE_BACKUP_ID, 'container': None}
UPDATED_CONTAINER_NAME = os.path.join(FAKE_BACKUP_ID_PART1,
FAKE_BACKUP_ID_PART2,
FAKE_BACKUP_ID)
-FAKE_PREFIX = 'prefix-'
-FAKE_CONTAINER_ENTRIES = [FAKE_PREFIX + 'one', FAKE_PREFIX + 'two', 'three']
-EXPECTED_CONTAINER_ENTRIES = [FAKE_PREFIX + 'one', FAKE_PREFIX + 'two']
-FAKE_OBJECT_NAME = 'fake-object-name'
-FAKE_OBJECT_PATH = os.path.join(FAKE_BACKUP_PATH, FAKE_CONTAINER,
- FAKE_OBJECT_NAME)
class BackupNFSShareTestCase(test.TestCase):
FAKE_BACKUP_SHARE)
-class BackupNFSTestCase(test.TestCase):
- def setUp(self):
- super(BackupNFSTestCase, self).setUp()
- self.ctxt = context.get_admin_context()
- self.override_config('backup_enable_progress_timer',
- FAKE_BACKUP_ENABLE_PROGRESS_TIMER)
- self.override_config('backup_mount_point_base',
- FAKE_BACKUP_MOUNT_POINT_BASE)
- self.override_config('backup_share', FAKE_BACKUP_SHARE)
- self.override_config('backup_mount_options', FAKE_BACKUP_MOUNT_OPTIONS)
-
- self.mock_object(nfs.NFSBackupDriver, '_check_configuration')
- self.mock_object(nfs.NFSBackupDriver, '_init_backup_repo_path',
- mock.Mock(return_value=FAKE_BACKUP_PATH))
- self.mock_object(nfs, 'LOG')
-
- self.driver = nfs.NFSBackupDriver(self.ctxt)
-
- def test_init(self):
- self.assertEqual(FAKE_BACKUP_ENABLE_PROGRESS_TIMER,
- self.driver.enable_progress_timer)
- self.assertEqual(FAKE_BACKUP_MOUNT_POINT_BASE,
- self.driver.backup_mount_point_base)
- self.assertEqual(FAKE_BACKUP_SHARE,
- self.driver.backup_share)
- self.assertEqual(FAKE_BACKUP_MOUNT_OPTIONS,
- self.driver.mount_options)
- self.assertTrue(self.driver._check_configuration.called)
- self.assertTrue(self.driver._init_backup_repo_path.called)
- self.assertTrue(nfs.LOG.debug.called)
-
- def test_update_container_name_container_passed(self):
- result = self.driver.update_container_name(FAKE_BACKUP, FAKE_CONTAINER)
-
- self.assertEqual(FAKE_CONTAINER, result)
-
- def test_update_container_na_container_passed(self):
- result = self.driver.update_container_name(FAKE_BACKUP, None)
-
- self.assertEqual(UPDATED_CONTAINER_NAME, result)
-
- def test_put_container(self):
- self.mock_object(os.path, 'exists', mock.Mock(return_value=False))
- self.mock_object(os, 'makedirs')
- self.mock_object(os, 'chmod')
- path = os.path.join(self.driver.backup_path, FAKE_CONTAINER)
-
- self.driver.put_container(FAKE_CONTAINER)
-
- os.path.exists.assert_called_once_with(path)
- os.makedirs.assert_called_once_with(path)
- os.chmod.assert_called_once_with(path, 0o770)
-
- def test_put_container_already_exists(self):
- self.mock_object(os.path, 'exists', mock.Mock(return_value=True))
- self.mock_object(os, 'makedirs')
- self.mock_object(os, 'chmod')
- path = os.path.join(self.driver.backup_path, FAKE_CONTAINER)
-
- self.driver.put_container(FAKE_CONTAINER)
-
- os.path.exists.assert_called_once_with(path)
- self.assertEqual(0, os.makedirs.call_count)
- self.assertEqual(0, os.chmod.call_count)
-
- def test_put_container_exception(self):
- self.mock_object(os.path, 'exists', mock.Mock(return_value=False))
- self.mock_object(os, 'makedirs', mock.Mock(
- side_effect=exceptions.OSError))
- self.mock_object(os, 'chmod')
- path = os.path.join(self.driver.backup_path, FAKE_CONTAINER)
-
- self.assertRaises(exceptions.OSError, self.driver.put_container,
- FAKE_CONTAINER)
- os.path.exists.assert_called_once_with(path)
- os.makedirs.called_once_with(path)
- self.assertEqual(0, os.chmod.call_count)
-
- def test_get_container_entries(self):
- self.mock_object(os, 'listdir', mock.Mock(
- return_value=FAKE_CONTAINER_ENTRIES))
-
- result = self.driver.get_container_entries(FAKE_CONTAINER, FAKE_PREFIX)
-
- self.assertEqual(EXPECTED_CONTAINER_ENTRIES, result)
-
- def test_get_container_entries_no_list(self):
- self.mock_object(os, 'listdir', mock.Mock(
- return_value=[]))
-
- result = self.driver.get_container_entries(FAKE_CONTAINER, FAKE_PREFIX)
-
- self.assertEqual([], result)
-
- def test_get_container_entries_no_match(self):
- self.mock_object(os, 'listdir', mock.Mock(
- return_value=FAKE_CONTAINER_ENTRIES))
-
- result = self.driver.get_container_entries(FAKE_CONTAINER,
- FAKE_PREFIX + 'garbage')
-
- self.assertEqual([], result)
-
- def test_get_object_writer(self):
- self.mock_object(builtins, 'open', mock.mock_open())
- self.mock_object(os, 'chmod')
-
- self.driver.get_object_writer(FAKE_CONTAINER, FAKE_OBJECT_NAME)
-
- os.chmod.assert_called_once_with(FAKE_OBJECT_PATH, 0o660)
- builtins.open.assert_called_once_with(FAKE_OBJECT_PATH, 'w')
-
- def test_get_object_reader(self):
- self.mock_object(builtins, 'open', mock.mock_open())
-
- self.driver.get_object_reader(FAKE_CONTAINER, FAKE_OBJECT_NAME)
-
- builtins.open.assert_called_once_with(FAKE_OBJECT_PATH, 'r')
-
- def test_delete_object(self):
- self.mock_object(os, 'remove')
-
- self.driver.delete_object(FAKE_CONTAINER, FAKE_OBJECT_NAME)
-
- def test_delete_nonexistent_object(self):
- self.mock_object(os, 'remove', mock.Mock(
- side_effect=exceptions.OSError))
-
- self.assertRaises(exceptions.OSError,
- self.driver.delete_object, FAKE_CONTAINER,
- FAKE_OBJECT_NAME)
-
-
def fake_md5(arg):
class result(object):
def hexdigest(self):
--- /dev/null
+# Copyright (c) 2015 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests for Posix backup driver.
+
+"""
+
+import exceptions
+import os
+
+import mock
+from six.moves import builtins
+
+from cinder.backup.drivers import posix
+from cinder import context
+from cinder import test
+
+
+FAKE_FILE_SIZE = 52428800
+FAKE_SHA_BLOCK_SIZE_BYTES = 1024
+FAKE_BACKUP_ENABLE_PROGRESS_TIMER = True
+
+FAKE_CONTAINER = 'fake/container'
+FAKE_BACKUP_ID_PART1 = 'de'
+FAKE_BACKUP_ID_PART2 = 'ad'
+FAKE_BACKUP_ID_REST = 'beef-whatever'
+FAKE_BACKUP_ID = (FAKE_BACKUP_ID_PART1 + FAKE_BACKUP_ID_PART2 +
+ FAKE_BACKUP_ID_REST)
+FAKE_BACKUP = {'id': FAKE_BACKUP_ID, 'container': None}
+
+UPDATED_CONTAINER_NAME = os.path.join(FAKE_BACKUP_ID_PART1,
+ FAKE_BACKUP_ID_PART2,
+ FAKE_BACKUP_ID)
+
+FAKE_BACKUP_MOUNT_POINT_BASE = '/fake/mount-point-base'
+FAKE_EXPORT_PATH = 'fake/export/path'
+
+FAKE_BACKUP_POSIX_PATH = os.path.join(FAKE_BACKUP_MOUNT_POINT_BASE,
+ FAKE_EXPORT_PATH)
+
+FAKE_PREFIX = 'prefix-'
+FAKE_CONTAINER_ENTRIES = [FAKE_PREFIX + 'one', FAKE_PREFIX + 'two', 'three']
+EXPECTED_CONTAINER_ENTRIES = [FAKE_PREFIX + 'one', FAKE_PREFIX + 'two']
+FAKE_OBJECT_NAME = 'fake-object-name'
+FAKE_OBJECT_PATH = os.path.join(FAKE_BACKUP_POSIX_PATH, FAKE_CONTAINER,
+ FAKE_OBJECT_NAME)
+
+
+class PosixBackupDriverTestCase(test.TestCase):
+
+ def setUp(self):
+ super(PosixBackupDriverTestCase, self).setUp()
+ self.ctxt = context.get_admin_context()
+
+ self.override_config('backup_file_size',
+ FAKE_FILE_SIZE)
+ self.override_config('backup_sha_block_size_bytes',
+ FAKE_SHA_BLOCK_SIZE_BYTES)
+ self.override_config('backup_enable_progress_timer',
+ FAKE_BACKUP_ENABLE_PROGRESS_TIMER)
+ self.override_config('backup_posix_path',
+ FAKE_BACKUP_POSIX_PATH)
+ self.mock_object(posix, 'LOG')
+
+ self.driver = posix.PosixBackupDriver(self.ctxt)
+
+ def test_init(self):
+ drv = posix.PosixBackupDriver(self.ctxt)
+ self.assertEqual(FAKE_BACKUP_POSIX_PATH,
+ drv.backup_path)
+
+ def test_update_container_name_container_passed(self):
+ result = self.driver.update_container_name(FAKE_BACKUP, FAKE_CONTAINER)
+
+ self.assertEqual(FAKE_CONTAINER, result)
+
+ def test_update_container_na_container_passed(self):
+ result = self.driver.update_container_name(FAKE_BACKUP, None)
+
+ self.assertEqual(UPDATED_CONTAINER_NAME, result)
+
+ def test_put_container(self):
+ self.mock_object(os.path, 'exists', mock.Mock(return_value=False))
+ self.mock_object(os, 'makedirs')
+ self.mock_object(os, 'chmod')
+ path = os.path.join(self.driver.backup_path, FAKE_CONTAINER)
+
+ self.driver.put_container(FAKE_CONTAINER)
+
+ os.path.exists.assert_called_once_with(path)
+ os.makedirs.assert_called_once_with(path)
+ os.chmod.assert_called_once_with(path, 0o770)
+
+ def test_put_container_already_exists(self):
+ self.mock_object(os.path, 'exists', mock.Mock(return_value=True))
+ self.mock_object(os, 'makedirs')
+ self.mock_object(os, 'chmod')
+ path = os.path.join(self.driver.backup_path, FAKE_CONTAINER)
+
+ self.driver.put_container(FAKE_CONTAINER)
+
+ os.path.exists.assert_called_once_with(path)
+ self.assertEqual(0, os.makedirs.call_count)
+ self.assertEqual(0, os.chmod.call_count)
+
+ def test_put_container_exception(self):
+ self.mock_object(os.path, 'exists', mock.Mock(return_value=False))
+ self.mock_object(os, 'makedirs', mock.Mock(
+ side_effect=exceptions.OSError))
+ self.mock_object(os, 'chmod')
+ path = os.path.join(self.driver.backup_path, FAKE_CONTAINER)
+
+ self.assertRaises(exceptions.OSError, self.driver.put_container,
+ FAKE_CONTAINER)
+ os.path.exists.assert_called_once_with(path)
+ os.makedirs.called_once_with(path)
+ self.assertEqual(0, os.chmod.call_count)
+
+ def test_get_container_entries(self):
+ self.mock_object(os, 'listdir', mock.Mock(
+ return_value=FAKE_CONTAINER_ENTRIES))
+
+ result = self.driver.get_container_entries(FAKE_CONTAINER, FAKE_PREFIX)
+
+ self.assertEqual(EXPECTED_CONTAINER_ENTRIES, result)
+
+ def test_get_container_entries_no_list(self):
+ self.mock_object(os, 'listdir', mock.Mock(
+ return_value=[]))
+
+ result = self.driver.get_container_entries(FAKE_CONTAINER, FAKE_PREFIX)
+
+ self.assertEqual([], result)
+
+ def test_get_container_entries_no_match(self):
+ self.mock_object(os, 'listdir', mock.Mock(
+ return_value=FAKE_CONTAINER_ENTRIES))
+
+ result = self.driver.get_container_entries(FAKE_CONTAINER,
+ FAKE_PREFIX + 'garbage')
+
+ self.assertEqual([], result)
+
+ def test_get_object_writer(self):
+ self.mock_object(builtins, 'open', mock.mock_open())
+ self.mock_object(os, 'chmod')
+
+ self.driver.get_object_writer(FAKE_CONTAINER, FAKE_OBJECT_NAME)
+
+ os.chmod.assert_called_once_with(FAKE_OBJECT_PATH, 0o660)
+ builtins.open.assert_called_once_with(FAKE_OBJECT_PATH, 'w')
+
+ def test_get_object_reader(self):
+ self.mock_object(builtins, 'open', mock.mock_open())
+
+ self.driver.get_object_reader(FAKE_CONTAINER, FAKE_OBJECT_NAME)
+
+ builtins.open.assert_called_once_with(FAKE_OBJECT_PATH, 'r')
+
+ def test_delete_object(self):
+ self.mock_object(os, 'remove')
+
+ self.driver.delete_object(FAKE_CONTAINER, FAKE_OBJECT_NAME)
+
+ def test_delete_nonexistent_object(self):
+ self.mock_object(os, 'remove', mock.Mock(
+ side_effect=exceptions.OSError))
+
+ self.assertRaises(exceptions.OSError,
+ self.driver.delete_object, FAKE_CONTAINER,
+ FAKE_OBJECT_NAME)