--- /dev/null
+# Copyright (c) 2015 Coho Data, Inc.
+# All Rights Reserved.
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import binascii
+import errno
+import mock
+import os
+import six
+import socket
+import xdrlib
+from cinder import exception
+from cinder import test
+from cinder.volume import configuration as conf
+from cinder.volume.drivers import coho
+from cinder.volume.drivers import nfs
+ADDR = 'coho-datastream-addr'
+PATH = '/test/path'
+RPC_PORT = 2049
+LOCAL_PATH = '/opt/cinder/mnt/test/path'
+ 'name': 'volume-bcc48c61-9691-4e5f-897c-793686093190',
+ 'volume_id': 'bcc48c61-9691-4e5f-897c-793686093190',
+ 'size': 128,
+ 'volume_type': 'silver',
+ 'volume_type_id': 'test',
+ 'metadata': [{'key': 'type',
+ 'service_label': 'silver'}],
+ 'provider_location': None,
+ 'id': 'bcc48c61-9691-4e5f-897c-793686093190',
+ 'status': 'available',
+CLONE_VOL['size'] = 256
+ 'name': 'snapshot-51dd4-8d8a-4aa9-9176-086c9d89e7fc',
+ 'id': '51dd4-8d8a-4aa9-9176-086c9d89e7fc',
+ 'size': 128,
+ 'volume_type': None,
+ 'provider_location': None,
+ 'volume_size': 128,
+ 'volume_name': 'volume-bcc48c61-9691-4e5f-897c-793686093190',
+ 'volume_id': 'bcc48c61-9691-4e5f-897c-793686093191',
+INVALID_SNAPSHOT['name'] = ''
+INVALID_HEADER_BIN = binascii.unhexlify('800000')
+NO_REPLY_BIN = binascii.unhexlify(
+ 'aaaaa01000000010000000000000000000000003')
+MSG_DENIED_BIN = binascii.unhexlify(
+ '00000a010000000110000000000000000000000000000003')
+PROC_UNAVAIL_BIN = binascii.unhexlify(
+ '00000a010000000100000000000000000000000000000003')
+PROG_UNAVAIL_BIN = binascii.unhexlify(
+ '000003c70000000100000000000000000000000000000001')
+PROG_MISMATCH_BIN = binascii.unhexlify(
+ '00000f7700000001000000000000000000000000000000020000000100000001')
+GARBAGE_ARGS_BIN = binascii.unhexlify(
+ '00000d6e0000000100000000000000000000000000000004')
+class CohoDriverTest(test.TestCase):
+ """Test Coho Data's NFS volume driver."""
+ def __init__(self, *args, **kwargs):
+ super(CohoDriverTest, self).__init__(*args, **kwargs)
+ def setUp(self):
+ super(CohoDriverTest, self).setUp()
+ self.context = mock.Mock()
+ self.configuration = mock.Mock(spec=conf.Configuration)
+ self.configuration.max_over_subscription_ratio = 20.0
+ self.configuration.reserved_percentage = 0
+ self.configuration.volume_backend_name = 'coho-1'
+ self.configuration.coho_rpc_port = 2049
+ self.configuration.nfs_shares_config = '/etc/cinder/coho_shares'
+ self.configuration.nfs_sparsed_volumes = True
+ self.configuration.nfs_mount_point_base = '/opt/stack/cinder/mnt'
+ self.configuration.nfs_mount_options = None
+ self.configuration.nas_ip = None
+ self.configuration.nas_share_path = None
+ self.configuration.nas_mount_options = None
+ self.configuration.nfs_used_ratio = .95
+ self.configuration.nfs_oversub_ratio = 1.0
+ def test_setup_failure_when_rpc_port_unconfigured(self):
+ self.configuration.coho_rpc_port = None
+ drv = coho.CohoDriver(configuration=self.configuration)
+ self.mock_object(coho, 'LOG')
+ self.mock_object(nfs.NfsDriver, 'do_setup')
+ with self.assertRaisesRegex(exception.CohoException,
+ ".*Coho rpc port is not configured.*"):
+ drv.do_setup(self.context)
+ self.assertTrue(coho.LOG.warning.called)
+ self.assertTrue(nfs.NfsDriver.do_setup.called)
+ def test_setup_failure_when_coho_rpc_port_is_invalid(self):
+ self.configuration.coho_rpc_port = 99999
+ drv = coho.CohoDriver(configuration=self.configuration)
+ self.mock_object(coho, 'LOG')
+ self.mock_object(nfs.NfsDriver, 'do_setup')
+ with self.assertRaisesRegex(exception.CohoException,
+ "Invalid port number.*"):
+ drv.do_setup(self.context)
+ self.assertTrue(coho.LOG.warning.called)
+ self.assertTrue(nfs.NfsDriver.do_setup.called)
+ def test_create_snapshot(self):
+ drv = coho.CohoDriver(configuration=self.configuration)
+ mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
+ mock_get_volume_location = self.mock_object(coho.CohoDriver,
+ '_get_volume_location')
+ mock_get_volume_location.return_value = ADDR, PATH
+ drv.create_snapshot(SNAPSHOT)
+ mock_get_volume_location.assert_has_calls(
+ [mock.call(SNAPSHOT['volume_id'])])
+ mock_rpc_client.assert_has_calls(
+ [mock.call(ADDR, self.configuration.coho_rpc_port),
+ mock.call().create_snapshot(
+ os.path.join(PATH, SNAPSHOT['volume_name']),
+ SNAPSHOT['name'], 0)])
+ def test_delete_snapshot(self):
+ drv = coho.CohoDriver(configuration=self.configuration)
+ mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
+ mock_get_volume_location = self.mock_object(coho.CohoDriver,
+ '_get_volume_location')
+ mock_get_volume_location.return_value = ADDR, PATH
+ drv.delete_snapshot(SNAPSHOT)
+ mock_get_volume_location.assert_has_calls(
+ [mock.call(SNAPSHOT['volume_id'])])
+ mock_rpc_client.assert_has_calls(
+ [mock.call(ADDR, self.configuration.coho_rpc_port),
+ mock.call().delete_snapshot(SNAPSHOT['name'])])
+ def test_create_volume_from_snapshot(self):
+ drv = coho.CohoDriver(configuration=self.configuration)
+ mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
+ mock_find_share = self.mock_object(drv, '_find_share')
+ mock_find_share.return_value = ADDR + ':' + PATH
+ drv.create_volume_from_snapshot(VOLUME, SNAPSHOT)
+ mock_find_share.assert_has_calls(
+ [mock.call(VOLUME['size'])])
+ mock_rpc_client.assert_has_calls(
+ [mock.call(ADDR, self.configuration.coho_rpc_port),
+ mock.call().create_volume_from_snapshot(
+ SNAPSHOT['name'], os.path.join(PATH, VOLUME['name']))])
+ def test_create_cloned_volume(self):
+ drv = coho.CohoDriver(configuration=self.configuration)
+ mock_find_share = self.mock_object(drv, '_find_share')
+ mock_find_share.return_value = ADDR + ':' + PATH
+ mock_execute = self.mock_object(drv, '_execute')
+ mock_local_path = self.mock_object(drv, 'local_path')
+ mock_local_path.return_value = LOCAL_PATH
+ drv.create_cloned_volume(VOLUME, CLONE_VOL)
+ mock_find_share.assert_has_calls(
+ [mock.call(VOLUME['size'])])
+ mock_local_path.assert_has_calls(
+ [mock.call(VOLUME), mock.call(CLONE_VOL)])
+ mock_execute.assert_has_calls(
+ [mock.call('cp', LOCAL_PATH, LOCAL_PATH, run_as_root=True)])
+ def test_extend_volume(self):
+ drv = coho.CohoDriver(configuration=self.configuration)
+ mock_execute = self.mock_object(drv, '_execute')
+ mock_local_path = self.mock_object(drv, 'local_path')
+ mock_local_path.return_value = LOCAL_PATH
+ drv.extend_volume(VOLUME, 512)
+ mock_local_path.assert_has_calls(
+ [mock.call(VOLUME)])
+ mock_execute.assert_has_calls(
+ [mock.call('truncate', '-s', '512G',
+ LOCAL_PATH, run_as_root=True)])
+ def test_snapshot_failure_when_source_does_not_exist(self):
+ drv = coho.CohoDriver(configuration=self.configuration)
+ self.mock_object(coho.Client, '_make_call')
+ mock_init_socket = self.mock_object(coho.Client, 'init_socket')
+ mock_unpack_uint = self.mock_object(xdrlib.Unpacker, 'unpack_uint')
+ mock_unpack_uint.return_value = errno.ENOENT
+ mock_get_volume_location = self.mock_object(coho.CohoDriver,
+ '_get_volume_location')
+ mock_get_volume_location.return_value = ADDR, PATH
+ with self.assertRaisesRegex(exception.CohoException,
+ "No such file or directory.*"):
+ drv.create_snapshot(SNAPSHOT)
+ self.assertTrue(mock_init_socket.called)
+ self.assertTrue(mock_unpack_uint.called)
+ mock_get_volume_location.assert_has_calls(
+ [mock.call(SNAPSHOT['volume_id'])])
+ def test_snapshot_failure_with_invalid_input(self):
+ drv = coho.CohoDriver(configuration=self.configuration)
+ self.mock_object(coho.Client, '_make_call')
+ mock_init_socket = self.mock_object(coho.Client, 'init_socket')
+ mock_unpack_uint = self.mock_object(xdrlib.Unpacker, 'unpack_uint')
+ mock_unpack_uint.return_value = errno.EINVAL
+ mock_get_volume_location = self.mock_object(coho.CohoDriver,
+ '_get_volume_location')
+ mock_get_volume_location.return_value = ADDR, PATH
+ with self.assertRaisesRegex(exception.CohoException,
+ "Invalid argument"):
+ drv.delete_snapshot(INVALID_SNAPSHOT)
+ self.assertTrue(mock_init_socket.called)
+ self.assertTrue(mock_unpack_uint.called)
+ mock_get_volume_location.assert_has_calls(
+ [mock.call(INVALID_SNAPSHOT['volume_id'])])
+ def test_snapshot_failure_when_remote_is_unreachable(self):
+ drv = coho.CohoDriver(configuration=self.configuration)
+ mock_get_volume_location = self.mock_object(coho.CohoDriver,
+ '_get_volume_location')
+ mock_get_volume_location.return_value = 'uknown-address', PATH
+ with self.assertRaisesRegex(exception.CohoException,
+ "Failed to establish connection.*"):
+ drv.create_snapshot(SNAPSHOT)
+ mock_get_volume_location.assert_has_calls(
+ [mock.call(INVALID_SNAPSHOT['volume_id'])])
+ def test_rpc_client_make_call_proper_order(self):
+ """This test ensures that the RPC client logic is correct.
+ When the RPC client's make_call function is called it creates
+ a packet and sends it to the Coho cluster RPC server. This test
+ ensures that the functions needed to complete the process are
+ called in the proper order with valid arguments.
+ """
+ mock_packer = self.mock_object(xdrlib, 'Packer')
+ mock_unpacker = self.mock_object(xdrlib, 'Unpacker')
+ mock_unpacker.return_value.unpack_uint.return_value = 0
+ mock_socket = self.mock_object(socket, 'socket')
+ mock_init_call = self.mock_object(coho.Client, 'init_call')
+ mock_init_call.return_value = (1, 2)
+ mock_sendrecord = self.mock_object(coho.Client, '_sendrecord')
+ mock_recvrecord = self.mock_object(coho.Client, '_recvrecord')
+ mock_recvrecord.return_value = 'test_reply'
+ mock_unpack_replyheader = self.mock_object(coho.Client,
+ 'unpack_replyheader')
+ mock_unpack_replyheader.return_value = (123, 1)
+ rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
+ rpc_client.create_volume_from_snapshot('src', 'dest')
+ self.assertTrue(mock_sendrecord.called)
+ self.assertTrue(mock_unpack_replyheader.called)
+ mock_packer.assert_has_calls([mock.call().reset()])
+ mock_unpacker.assert_has_calls(
+ [mock.call().reset('test_reply'),
+ mock.call().unpack_uint()])
+ mock_socket.assert_has_calls(
+ [mock.call(socket.AF_INET, socket.SOCK_STREAM),
+ mock.call().bind(('', 0)),
+ mock.call().connect((ADDR, RPC_PORT))])
+ mock_init_call.assert_has_calls(
+ [(six.b('src'), mock_packer().pack_string),
+ (six.b('dest'), mock_packer().pack_string)])])
+ def test_rpc_client_error_in_reply_header(self):
+ """Ensure excpetions in reply header are raised by the RPC client.
+ Coho cluster's RPC server packs errors into the reply header.
+ This test ensures that the RPC client parses the reply header
+ correctly and raises exceptions on various errors that can be
+ included in the reply header.
+ """
+ mock_socket = self.mock_object(socket, 'socket')
+ mock_recvrecord = self.mock_object(coho.Client, '_recvrecord')
+ rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
+ mock_recvrecord.return_value = NO_REPLY_BIN
+ with self.assertRaisesRegex(exception.CohoException,
+ "no REPLY.*"):
+ rpc_client.create_snapshot('src', 'dest', 0)
+ mock_recvrecord.return_value = MSG_DENIED_BIN
+ with self.assertRaisesRegex(exception.CohoException,
+ ".*MSG_DENIED.*"):
+ rpc_client.delete_snapshot('snapshot')
+ mock_recvrecord.return_value = PROG_UNAVAIL_BIN
+ with self.assertRaisesRegex(exception.CohoException,
+ rpc_client.delete_snapshot('snapshot')
+ mock_recvrecord.return_value = PROG_MISMATCH_BIN
+ with self.assertRaisesRegex(exception.CohoException,
+ rpc_client.delete_snapshot('snapshot')
+ mock_recvrecord.return_value = GARBAGE_ARGS_BIN
+ with self.assertRaisesRegex(exception.CohoException,
+ rpc_client.delete_snapshot('snapshot')
+ mock_recvrecord.return_value = PROC_UNAVAIL_BIN
+ with self.assertRaisesRegex(exception.CohoException,
+ rpc_client.delete_snapshot('snapshot')
+ self.assertTrue(mock_recvrecord.called)
+ mock_socket.assert_has_calls(
+ [mock.call(socket.AF_INET, socket.SOCK_STREAM),
+ mock.call().bind(('', 0)),
+ mock.call().connect((ADDR, RPC_PORT))])
+ def test_rpc_client_error_in_receive_fragment(self):
+ """Ensure exception is raised when malformed packet is recieved."""
+ mock_sendrcd = self.mock_object(coho.Client, '_sendrecord')
+ mock_socket = self.mock_object(socket, 'socket')
+ mock_socket.return_value.recv.return_value = INVALID_HEADER_BIN
+ rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
+ with self.assertRaisesRegex(exception.CohoException,
+ "Invalid response header.*"):
+ rpc_client.create_snapshot('src', 'dest', 0)
+ self.assertTrue(mock_sendrcd.called)
+ mock_socket.assert_has_calls(
+ [mock.call(socket.AF_INET, socket.SOCK_STREAM),
+ mock.call().bind(('', 0)),
+ mock.call().connect((ADDR, RPC_PORT)),
+ mock.call().recv(4)])
--- /dev/null
+# Copyright (c) 2015 Coho Data, Inc.
+# All Rights Reserved.
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import os
+import six
+import socket
+import xdrlib
+from oslo_config import cfg
+from oslo_log import log as logging
+from random import randint
+from cinder import exception
+from cinder.i18n import _
+from cinder.volume.drivers import nfs
+# RPC Definition
+CALL = 0
+REPLY = 1
+COHO_PROGRAM = 400115
+COHO_V1 = 1
+# Simple RPC Client
+def make_auth_null():
+ return six.b('')
+class Client(object):
+ def __init__(self, address, prog, vers, port):
+ self.packer = xdrlib.Packer()
+ self.unpacker = xdrlib.Unpacker('')
+ self.address = address
+ self.prog = prog
+ self.vers = vers
+ self.port = port
+ self.cred = None
+ self.verf = None
+ self.init_socket()
+ self.init_xid()
+ def init_socket(self):
+ try:
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.bind(('', 0))
+ self.sock.connect((self.address, self.port))
+ except socket.error:
+ msg = _('Failed to establish connection with Coho cluster')
+ raise exception.CohoException(msg)
+ def init_xid(self):
+ self.xid = randint(0, 4096)
+ def make_xid(self):
+ self.xid += 1
+ def make_cred(self):
+ if self.cred is None:
+ self.cred = (AUTH_NULL, make_auth_null())
+ return self.cred
+ def make_verf(self):
+ if self.verf is None:
+ self.verf = (AUTH_NULL, make_auth_null())
+ return self.verf
+ def pack_auth(self, auth):
+ flavor, stuff = auth
+ self.packer.pack_enum(flavor)
+ self.packer.pack_opaque(stuff)
+ def pack_callheader(self, xid, prog, vers, proc, cred, verf):
+ self.packer.pack_uint(xid)
+ self.packer.pack_enum(CALL)
+ self.packer.pack_uint(RPCVERSION)
+ self.packer.pack_uint(prog)
+ self.packer.pack_uint(vers)
+ self.packer.pack_uint(proc)
+ self.pack_auth(cred)
+ self.pack_auth(verf)
+ def unpack_auth(self):
+ flavor = self.unpacker.unpack_enum()
+ stuff = self.unpacker.unpack_opaque()
+ return (flavor, stuff)
+ def unpack_replyheader(self):
+ xid = self.unpacker.unpack_uint()
+ mtype = self.unpacker.unpack_enum()
+ if mtype != REPLY:
+ raise exception.CohoException(
+ _('no REPLY but %r') % (mtype,))
+ stat = self.unpacker.unpack_enum()
+ if stat == MSG_DENIED:
+ stat = self.unpacker.unpack_enum()
+ if stat == RPC_MISMATCH:
+ low = self.unpacker.unpack_uint()
+ high = self.unpacker.unpack_uint()
+ raise exception.CohoException(
+ _('MSG_DENIED: RPC_MISMATCH: %r') % ((low, high),))
+ if stat == AUTH_ERROR:
+ stat = self.unpacker.unpack_uint()
+ raise exception.CohoException(
+ _('MSG_DENIED: AUTH_ERROR: %r') % (stat,))
+ raise exception.CohoException(_('MSG_DENIED: %r') % (stat,))
+ if stat != MSG_ACCEPTED:
+ raise exception.CohoException(
+ _('Neither MSG_DENIED nor MSG_ACCEPTED: %r') % (stat,))
+ verf = self.unpack_auth()
+ stat = self.unpacker.unpack_enum()
+ if stat == PROG_UNAVAIL:
+ raise exception.CohoException(_('call failed: PROG_UNAVAIL'))
+ if stat == PROG_MISMATCH:
+ low = self.unpacker.unpack_uint()
+ high = self.unpacker.unpack_uint()
+ raise exception.CohoException(
+ _('call failed: PROG_MISMATCH: %r') % ((low, high),))
+ if stat == PROC_UNAVAIL:
+ raise exception.CohoException(_('call failed: PROC_UNAVAIL'))
+ if stat == GARBAGE_ARGS:
+ raise exception.CohoException(_('call failed: GARBAGE_ARGS'))
+ if stat != SUCCESS:
+ raise exception.CohoException(_('call failed: %r') % (stat,))
+ return xid, verf
+ def init_call(self, proc, args):
+ self.make_xid()
+ self.packer.reset()
+ cred = self.make_cred()
+ verf = self.make_verf()
+ self.pack_callheader(self.xid, self.prog, self.vers, proc, cred, verf)
+ for arg, func in args:
+ func(arg)
+ return self.xid, self.packer.get_buf()
+ def _sendfrag(self, last, frag):
+ x = len(frag)
+ if last:
+ x = x | 0x80000000
+ header = (six.int2byte(int(x >> 24 & 0xff)) +
+ six.int2byte(int(x >> 16 & 0xff)) +
+ six.int2byte(int(x >> 8 & 0xff)) +
+ six.int2byte(int(x & 0xff)))
+ self.sock.send(header + frag)
+ def _sendrecord(self, record):
+ self._sendfrag(1, record)
+ def _recvfrag(self):
+ header = self.sock.recv(4)
+ if len(header) < 4:
+ raise exception.CohoException(
+ _('Invalid response header from RPC server'))
+ x = (six.indexbytes(header, 0) << 24 |
+ six.indexbytes(header, 1) << 16 |
+ six.indexbytes(header, 2) << 8 |
+ six.indexbytes(header, 3))
+ last = ((x & 0x80000000) != 0)
+ n = int(x & 0x7fffffff)
+ frag = six.b('')
+ while n > 0:
+ buf = self.sock.recv(n)
+ if not buf:
+ raise exception.CohoException(
+ _('RPC server response is incomplete'))
+ n = n - len(buf)
+ frag = frag + buf
+ return last, frag
+ def _recvrecord(self):
+ record = six.b('')
+ last = 0
+ while not last:
+ last, frag = self._recvfrag()
+ record = record + frag
+ return record
+ def _make_call(self, proc, args):
+ self.packer.reset()
+ xid, call = self.init_call(proc, args)
+ self._sendrecord(call)
+ reply = self._recvrecord()
+ self.unpacker.reset(reply)
+ xid, verf = self.unpack_replyheader()
+ def _call(self, proc, args):
+ self._make_call(proc, args)
+ res = self.unpacker.unpack_uint()
+ if res != SUCCESS:
+ raise exception.CohoException(os.strerror(res))
+class CohoRPCClient(Client):
+ def __init__(self, address, port):
+ Client.__init__(self, address, COHO_PROGRAM, 1, port)
+ def create_snapshot(self, src, dst, flags):
+ [(six.b(src), self.packer.pack_string),
+ (six.b(dst), self.packer.pack_string),
+ (flags, self.packer.pack_uint)])
+ def delete_snapshot(self, name):
+ [(six.b(name), self.packer.pack_string)])
+ def create_volume_from_snapshot(self, src, dst):
+ [(six.b(src), self.packer.pack_string),
+ (six.b(dst), self.packer.pack_string)])
+# Coho Data Volume Driver
+VERSION = '1.0.0'
+LOG = logging.getLogger(__name__)
+coho_opts = [
+ cfg.IntOpt('coho_rpc_port',
+ default=2049,
+ help='RPC port to connect to Coha Data MicroArray')
+CONF = cfg.CONF
+class CohoDriver(nfs.NfsDriver):
+ """Coho Data NFS based cinder driver.
+ Creates file on NFS share for using it as block device on hypervisor.
+ Version history:
+ 1.0.0 - Initial driver
+ """
+ # We have to overload this attribute of RemoteFSDriver because
+ # unfortunately the base method doesn't accept exports of the form:
+ # <address>:/
+ # It expects a non blank export name following the /.
+ # We are more permissive.
+ SHARE_FORMAT_REGEX = r'.+:/.*'
+ def __init__(self, *args, **kwargs):
+ super(CohoDriver, self).__init__(*args, **kwargs)
+ self.configuration.append_config_values(coho_opts)
+ self._execute_as_root = True
+ self._rpcclients = dict()
+ self._backend_name = (self.configuration.volume_backend_name or
+ self.__class__.__name__)
+ def _init_rpcclient(self, addr, port):
+ client = CohoRPCClient(addr, port)
+ self._rpcclients[(addr, port)] = client
+ return client
+ def _get_rpcclient(self, addr, port):
+ if (addr, port) in self._rpcclients:
+ return self._rpcclients[(addr, port)]
+ return self._init_rpcclient(addr, port)
+ def do_setup(self, context):
+ """Any initialization the volume driver does while starting."""
+ super(CohoDriver, self).do_setup(context)
+ self._context = context
+ config = self.configuration.coho_rpc_port
+ if not config:
+ msg = _("Coho rpc port is not configured")
+ LOG.warning(msg)
+ raise exception.CohoException(msg)
+ if config < 1 or config > 65535:
+ msg = (_("Invalid port number %(config)s for Coho rpc port") %
+ {'config': config})
+ LOG.warning(msg)
+ raise exception.CohoException(msg)
+ def _do_clone_volume(self, volume, src):
+ """Clone volume to source.
+ Create a volume on given remote share with the same contents
+ as the specified source.
+ """
+ volume_path = self.local_path(volume)
+ source_path = self.local_path(src)
+ self._execute('cp', source_path, volume_path,
+ run_as_root=self._execute_as_root)
+ def _get_volume_location(self, volume_id):
+ """Returns provider location for given volume."""
+ # The driver should not directly access db, but since volume is not
+ # passed in create_snapshot and delete_snapshot we are forced to read
+ # the volume info from the database
+ volume = self.db.volume_get(self._context, volume_id)
+ addr, path = volume.provider_location.split(":")
+ return addr, path
+ def create_snapshot(self, snapshot):
+ """Create a volume snapshot."""
+ addr, path = self._get_volume_location(snapshot['volume_id'])
+ volume_path = os.path.join(path, snapshot['volume_name'])
+ snapshot_name = snapshot['name']
+ flags = 0 # unused at this time
+ client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
+ client.create_snapshot(volume_path, snapshot_name, flags)
+ def delete_snapshot(self, snapshot):
+ """Delete a volume snapshot."""
+ addr, path = self._get_volume_location(snapshot['volume_id'])
+ snapshot_name = snapshot['name']
+ client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
+ client.delete_snapshot(snapshot_name)
+ def create_volume_from_snapshot(self, volume, snapshot):
+ """Create a volume from a snapshot."""
+ volume['provider_location'] = self._find_share(volume['size'])
+ addr, path = volume['provider_location'].split(":")
+ volume_path = os.path.join(path, volume['name'])
+ snapshot_name = snapshot['name']
+ client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
+ client.create_volume_from_snapshot(snapshot_name, volume_path)
+ return {'provider_location': volume['provider_location']}
+ def _extend_file_sparse(self, path, size):
+ """Extend the size of a file (with no additional disk usage)."""
+ self._execute('truncate', '-s', '%sG' % size,
+ path, run_as_root=self._execute_as_root)
+ def create_cloned_volume(self, volume, src_vref):
+ volume['provider_location'] = self._find_share(volume['size'])
+ self._do_clone_volume(volume, src_vref)
+ def extend_volume(self, volume, new_size):
+ """Extend the specified file to the new_size (sparsely)."""
+ volume_path = self.local_path(volume)
+ self._extend_file_sparse(volume_path, new_size)
+ def get_volume_stats(self, refresh):
+ """Pass in Coho Data information in volume stats."""
+ _stats = super(CohoDriver, self).get_volume_stats(refresh)
+ _stats["vendor_name"] = 'Coho Data'
+ _stats["driver_version"] = VERSION
+ _stats["storage_protocol"] = 'NFS'
+ _stats["volume_backend_name"] = self._backend_name
+ return _stats