# Copyright (c) 2013 Zelin.io
+# Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import contextlib
+import errno
import mock
from oslo_concurrency import processutils
import six
from cinder.backup import driver as backup_driver
+from cinder import context
from cinder import db
from cinder import exception
+from cinder.i18n import _
from cinder.image import image_utils
from cinder import test
+from cinder.tests.unit import fake_volume
+from cinder import utils
from cinder.volume import configuration as conf
from cinder.volume.drivers import sheepdog
-
-COLLIE_NODE_INFO = """
+SHEEP_ADDR = '127.0.0.1'
+SHEEP_PORT = 7000
+
+
+class SheepdogDriverTestDataGenerator(object):
+ def __init__(self):
+ self.TEST_VOLUME = self._make_fake_volume(self.TEST_VOL_DATA)
+
+ def sheepdog_cmd_error(self, cmd, exit_code, stdout, stderr):
+ return (('(Command: %(cmd)s) '
+ '(Return Code: %(exit_code)s) '
+ '(Stdout: %(stdout)s) '
+ '(Stderr: %(stderr)s)') %
+ {'cmd': cmd,
+ 'exit_code': exit_code,
+ 'stdout': stdout.replace('\n', '\\n'),
+ 'stderr': stderr.replace('\n', '\\n')})
+
+ def _make_fake_volume(self, volume_data):
+ return fake_volume.fake_volume_obj(context.get_admin_context(),
+ **volume_data)
+
+ CMD_DOG_CLUSTER_INFO = ('env', 'LC_ALL=C', 'LANG=C', 'dog', 'cluster',
+ 'info', '-a', SHEEP_ADDR, '-p', str(SHEEP_PORT))
+
+ TEST_VOL_DATA = {
+ 'size': 1,
+ 'id': '00000000-0000-0000-0000-000000000001',
+ 'provider_auth': None,
+ 'host': 'host@backendsec#unit_test_pool',
+ 'project_id': 'project',
+ 'provider_location': 'location',
+ 'display_name': 'vol1',
+ 'display_description': 'unit test volume',
+ 'volume_type_id': None,
+ 'consistencygroup_id': None,
+ }
+
+ COLLIE_NODE_INFO = """
0 107287605248 3623897354 3%
Total 107287605248 3623897354 3% 54760833024
"""
-COLLIE_CLUSTER_INFO_0_5 = """
+ COLLIE_CLUSTER_INFO_0_5 = """\
Cluster status: running
Cluster created at Tue Jun 25 19:51:41 2013
2013-06-25 19:51:41 1 [127.0.0.1:7000, 127.0.0.1:7001, 127.0.0.1:7002]
"""
-COLLIE_CLUSTER_INFO_0_6 = """
+ COLLIE_CLUSTER_INFO_0_6 = """\
Cluster status: running, auto-recovery enabled
Cluster created at Tue Jun 25 19:51:41 2013
2013-06-25 19:51:41 1 [127.0.0.1:7000, 127.0.0.1:7001, 127.0.0.1:7002]
"""
+ DOG_CLUSTER_RUNNING = """\
+Cluster status: running, auto-recovery enabled
+
+Cluster created at Thu Jun 18 17:24:56 2015
+
+Epoch Time Version [Host:Port:V-Nodes,,,]
+2015-06-18 17:24:56 1 [127.0.0.1:7000:128, 127.0.0.1:7001:128,\
+ 127.0.0.1:7002:128]
+"""
+
+ DOG_CLUSTER_INFO_TO_BE_FORMATTED = """\
+Cluster status: Waiting for cluster to be formatted
+"""
+
+ DOG_CLUSTER_INFO_WAITING_OTHER_NODES = """\
+Cluster status: Waiting for other nodes to join cluster
+
+Cluster created at Thu Jun 18 17:24:56 2015
+
+Epoch Time Version [Host:Port:V-Nodes,,,]
+2015-06-18 17:24:56 1 [127.0.0.1:7000:128, 127.0.0.1:7001:128]
+"""
+
+ DOG_CLUSTER_INFO_SHUTTING_DOWN = """\
+Cluster status: System is shutting down
+"""
+
+ DOG_COMMAND_ERROR_FAIL_TO_CONNECT = """\
+failed to connect to 127.0.0.1:7000: Connection refused
+failed to connect to 127.0.0.1:7000: Connection refused
+Failed to get node list
+"""
+
class FakeImageService(object):
def download(self, context, image_id, path):
self.assertRaises(IOError, self.vdi_wrapper.fileno)
-class SheepdogTestCase(test.TestCase):
+class SheepdogClientTestCase(test.TestCase):
def setUp(self):
- super(SheepdogTestCase, self).setUp()
- self.driver = sheepdog.SheepdogDriver(
- configuration=conf.Configuration(None))
-
+ super(SheepdogClientTestCase, self).setUp()
+ self._cfg = conf.Configuration(None)
+ self._cfg.sheepdog_store_address = SHEEP_ADDR
+ self._cfg.sheepdog_store_port = SHEEP_PORT
+ self.driver = sheepdog.SheepdogDriver(configuration=self._cfg)
+ db_driver = self.driver.configuration.db_driver
+ self.db = importutils.import_module(db_driver)
+ self.driver.db = self.db
+ self.driver.do_setup(None)
+ self.test_data = SheepdogDriverTestDataGenerator()
+ self.client = self.driver.client
+
+ @mock.patch.object(utils, 'execute')
+ def test_run_dog_success(self, fake_execute):
+ args = ('cluster', 'info')
+ expected_cmd = self.test_data.CMD_DOG_CLUSTER_INFO
+ fake_execute.return_value = ('', '')
+ self.client._run_dog(*args)
+ fake_execute.assert_called_once_with(*expected_cmd)
+
+ @mock.patch.object(utils, 'execute')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_run_dog_command_not_found(self, fake_logger, fake_execute):
+ args = ('cluster', 'info')
+ expected_msg = 'No such file or directory'
+ expected_errno = errno.ENOENT
+ fake_execute.side_effect = OSError(expected_errno, expected_msg)
+ self.assertRaises(OSError, self.client._run_dog, *args)
+ self.assertTrue(fake_logger.error.called)
+
+ @mock.patch.object(utils, 'execute')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_run_dog_operation_not_permitted(self, fake_logger, fake_execute):
+ args = ('cluster', 'info')
+ expected_msg = 'Operation not permitted'
+ expected_errno = errno.EPERM
+ fake_execute.side_effect = OSError(expected_errno, expected_msg)
+ self.assertRaises(OSError, self.client._run_dog, *args)
+ self.assertTrue(fake_logger.error.called)
+
+ @mock.patch.object(utils, 'execute')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_run_dog_unknown_error(self, fake_logger, fake_execute):
+ args = ('cluster', 'info')
+ cmd = self.test_data.CMD_DOG_CLUSTER_INFO
+ exit_code = 1
+ stdout = 'stdout dummy'
+ stderr = 'stderr dummy'
+ expected_msg = self.test_data.sheepdog_cmd_error(
+ cmd=cmd, exit_code=exit_code, stdout=stdout, stderr=stderr)
+ fake_execute.side_effect = processutils.ProcessExecutionError(
+ cmd=cmd, exit_code=exit_code, stdout=stdout, stderr=stderr)
+ ex = self.assertRaises(exception.SheepdogCmdError,
+ self.client._run_dog, *args)
+ self.assertEqual(expected_msg, ex.msg)
+
+ @mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_check_cluster_status_success(self, fake_logger, fake_execute):
+ stdout = self.test_data.DOG_CLUSTER_RUNNING
+ stderr = ''
+ expected_cmd = ('cluster', 'info')
+ fake_execute.return_value = (stdout, stderr)
+ self.client.check_cluster_status()
+ fake_execute.assert_called_once_with(*expected_cmd)
+ self.assertTrue(fake_logger.debug.called)
+
+ @mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
+ def test_check_cluster_status_v0_5(self, fake_execute):
+ stdout = self.test_data.COLLIE_CLUSTER_INFO_0_5
+ stderr = ''
+ fake_execute.return_value = (stdout, stderr)
+ self.client.check_cluster_status()
+
+ @mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
+ def test_check_cluster_status_v0_6(self, fake_execute):
+ stdout = self.test_data.COLLIE_CLUSTER_INFO_0_6
+ stderr = ''
+ fake_execute.return_value = (stdout, stderr)
+ self.client.check_cluster_status()
+
+ @mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_check_cluster_status_not_formatted(self, fake_logger,
+ fake_execute):
+ stdout = self.test_data.DOG_CLUSTER_INFO_TO_BE_FORMATTED
+ stderr = ''
+ expected_reason = _('Cluster is not formatted. '
+ 'You should probably perform '
+ '"dog cluster format".')
+ fake_execute.return_value = (stdout, stderr)
+ ex = self.assertRaises(exception.SheepdogError,
+ self.client.check_cluster_status)
+ self.assertEqual(expected_reason, ex.kwargs['reason'])
+
+ @mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_check_cluster_status_waiting_to_join_cluster(self, fake_logger,
+ fake_execute):
+ stdout = self.test_data.DOG_CLUSTER_INFO_WAITING_OTHER_NODES
+ stderr = ''
+ expected_reason = _('Waiting for all nodes to join cluster. '
+ 'Ensure all sheep daemons are running.')
+ fake_execute.return_value = (stdout, stderr)
+ ex = self.assertRaises(exception.SheepdogError,
+ self.client.check_cluster_status)
+ self.assertEqual(expected_reason, ex.kwargs['reason'])
+
+ @mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_check_cluster_status_shutting_down(self, fake_logger,
+ fake_execute):
+ stdout = self.test_data.DOG_CLUSTER_INFO_SHUTTING_DOWN
+ stderr = ''
+ expected_reason = _('Invalid sheepdog cluster status.')
+ fake_execute.return_value = (stdout, stderr)
+ ex = self.assertRaises(exception.SheepdogError,
+ self.client.check_cluster_status)
+ self.assertEqual(expected_reason, ex.kwargs['reason'])
+
+ @mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_check_cluster_status_fail_to_connect(self, fake_logger,
+ fake_execute):
+ cmd = self.test_data.CMD_DOG_CLUSTER_INFO
+ exit_code = 2
+ stdout = 'stdout_dummy'
+ stderr = self.test_data.DOG_COMMAND_ERROR_FAIL_TO_CONNECT
+ expected_msg = self.test_data.sheepdog_cmd_error(cmd=cmd,
+ exit_code=exit_code,
+ stdout=stdout,
+ stderr=stderr)
+ fake_execute.side_effect = exception.SheepdogCmdError(
+ cmd=cmd, exit_code=exit_code, stdout=stdout.replace('\n', '\\n'),
+ stderr=stderr.replace('\n', '\\n'))
+ ex = self.assertRaises(exception.SheepdogCmdError,
+ self.client.check_cluster_status)
+ self.assertEqual(expected_msg, ex.msg)
+ self.assertTrue(fake_logger.error.called)
+
+ @mock.patch.object(sheepdog.SheepdogClient, '_run_dog')
+ @mock.patch.object(sheepdog, 'LOG')
+ def test_check_cluster_status_unknown_error(self, fake_logger,
+ fake_execute):
+ cmd = self.test_data.CMD_DOG_CLUSTER_INFO
+ exit_code = 2
+ stdout = 'stdout_dummy'
+ stderr = 'stdout_dummy'
+ expected_msg = self.test_data.sheepdog_cmd_error(cmd=cmd,
+ exit_code=exit_code,
+ stdout=stdout,
+ stderr=stderr)
+ fake_execute.side_effect = exception.SheepdogCmdError(
+ cmd=cmd, exit_code=exit_code, stdout=stdout, stderr=stderr)
+ ex = self.assertRaises(exception.SheepdogCmdError,
+ self.client.check_cluster_status)
+ self.assertEqual(expected_msg, ex.msg)
+
+
+class SheepdogDriverTestCase(test.TestCase):
+ def setUp(self):
+ super(SheepdogDriverTestCase, self).setUp()
+ self._cfg = conf.Configuration(None)
+ self._cfg.sheepdog_store_address = SHEEP_ADDR
+ self._cfg.sheepdog_store_port = SHEEP_PORT
+ self.driver = sheepdog.SheepdogDriver(configuration=self._cfg)
db_driver = self.driver.configuration.db_driver
self.db = importutils.import_module(db_driver)
self.driver.db = self.db
self.driver.do_setup(None)
+ self.test_data = SheepdogDriverTestDataGenerator()
+ self.client = self.driver.client
+
+ @mock.patch.object(sheepdog.SheepdogClient, 'check_cluster_status')
+ def test_check_for_setup_error(self, fake_execute):
+ self.driver.check_for_setup_error()
+ fake_execute.assert_called_once_with()
def test_update_volume_stats(self):
def fake_stats(*args):
- return COLLIE_NODE_INFO, ''
+ return self.test_data.COLLIE_NODE_INFO, ''
self.stubs.Set(self.driver, '_execute', fake_stats)
expected = dict(
volume_backend_name='sheepdog',
actual = self.driver.get_volume_stats(True)
self.assertDictMatch(expected, actual)
- def test_check_for_setup_error_0_5(self):
- def fake_stats(*args):
- return COLLIE_CLUSTER_INFO_0_5, ''
- self.stubs.Set(self.driver, '_execute', fake_stats)
- self.driver.check_for_setup_error()
-
- def test_check_for_setup_error_0_6(self):
- def fake_stats(*args):
- return COLLIE_CLUSTER_INFO_0_6, ''
- self.stubs.Set(self.driver, '_execute', fake_stats)
- self.driver.check_for_setup_error()
-
def test_copy_image_to_volume(self):
@contextlib.contextmanager
def fake_temp_file():
# Copyright 2012 OpenStack Foundation
# Copyright (c) 2013 Zelin.io
+# Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
SheepDog Volume Driver.
"""
+import errno
import eventlet
import io
import re
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
+from oslo_utils import excutils
from oslo_utils import units
from cinder import exception
from cinder.i18n import _, _LE
from cinder.image import image_utils
+from cinder import utils
from cinder.volume import driver
LOG = logging.getLogger(__name__)
+sheepdog_opts = [
+ cfg.StrOpt('sheepdog_store_address',
+ default='127.0.0.1',
+ help=('IP address of sheep daemon.')),
+ cfg.IntOpt('sheepdog_store_port',
+ min=1, max=65535,
+ default=7000,
+ help=('Port of sheep daemon.'))
+]
+
CONF = cfg.CONF
CONF.import_opt("image_conversion_dir", "cinder.image.image_utils")
+CONF.register_opts(sheepdog_opts)
+
+
+class SheepdogClient(object):
+ """Sheepdog command executor."""
+ DOG_RESP_CONNECTION_ERROR = 'failed to connect to'
+ DOG_RESP_CLUSTER_RUNNING = 'Cluster status: running'
+ DOG_RESP_CLUSTER_NOT_FORMATTED = ('Cluster status: '
+ 'Waiting for cluster to be formatted')
+ DOG_RESP_CLUSTER_WAITING = ('Cluster status: '
+ 'Waiting for other nodes to join cluster')
+
+ def __init__(self, addr, port):
+ self.addr = addr
+ self.port = port
+
+ def _run_dog(self, command, subcommand, *params):
+ cmd = ('env', 'LC_ALL=C', 'LANG=C', 'dog', command, subcommand,
+ '-a', self.addr, '-p', str(self.port)) + params
+ try:
+ return utils.execute(*cmd)
+ except OSError as e:
+ with excutils.save_and_reraise_exception():
+ if e.errno == errno.ENOENT:
+ msg = _LE('Sheepdog is not installed. '
+ 'OSError: command is %s.')
+ else:
+ msg = _LE('OSError: command is %s.')
+ LOG.error(msg, cmd)
+ except processutils.ProcessExecutionError as e:
+ raise exception.SheepdogCmdError(
+ cmd=e.cmd,
+ exit_code=e.exit_code,
+ stdout=e.stdout.replace('\n', '\\n'),
+ stderr=e.stderr.replace('\n', '\\n'))
+
+ def check_cluster_status(self):
+ try:
+ (_stdout, _stderr) = self._run_dog('cluster', 'info')
+ except exception.SheepdogCmdError as e:
+ cmd = e.kwargs['cmd']
+ _stderr = e.kwargs['stderr']
+ with excutils.save_and_reraise_exception():
+ if _stderr.startswith(self.DOG_RESP_CONNECTION_ERROR):
+ msg = _LE('Failed to connect sheep daemon. '
+ 'addr: %(addr)s, port: %(port)s')
+ LOG.error(msg, {'addr': self.addr, 'port': self.port})
+ else:
+ LOG.error(_LE('Failed to check cluster status.'
+ '(command: %s)'), cmd)
+
+ if _stdout.startswith(self.DOG_RESP_CLUSTER_RUNNING):
+ LOG.debug('Sheepdog cluster is running.')
+ return
+
+ reason = _('Invalid sheepdog cluster status.')
+ if _stdout.startswith(self.DOG_RESP_CLUSTER_NOT_FORMATTED):
+ reason = _('Cluster is not formatted. '
+ 'You should probably perform "dog cluster format".')
+ elif _stdout.startswith(self.DOG_RESP_CLUSTER_WAITING):
+ reason = _('Waiting for all nodes to join cluster. '
+ 'Ensure all sheep daemons are running.')
+ raise exception.SheepdogError(reason=reason)
class SheepdogIOWrapper(io.RawIOBase):
def __init__(self, *args, **kwargs):
super(SheepdogDriver, self).__init__(*args, **kwargs)
+ self.client = SheepdogClient(CONF.sheepdog_store_address,
+ CONF.sheepdog_store_port)
self.stats_pattern = re.compile(r'[\w\s%]*Total\s(\d+)\s(\d+)*')
self._stats = {}
def check_for_setup_error(self):
- """Return error if prerequisites aren't met."""
- try:
- # NOTE(francois-charlier) Since 0.24 'collie cluster info -r'
- # gives short output, but for compatibility reason we won't
- # use it and just check if 'running' is in the output.
- (out, _err) = self._execute('collie', 'cluster', 'info')
- if 'status: running' not in out:
- exception_message = (_("Sheepdog is not working: %s") % out)
- raise exception.VolumeBackendAPIException(
- data=exception_message)
-
- except processutils.ProcessExecutionError:
- exception_message = _("Sheepdog is not working")
- raise exception.VolumeBackendAPIException(data=exception_message)
+ self.client.check_cluster_status()
def _is_cloneable(self, image_location, image_meta):
"""Check the image can be clone or not."""