]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Volume driver for Coho Data storage solutions
authorBardia Keyoumarsi <bardia.keyoumarsi@cohodata.com>
Wed, 18 Nov 2015 00:07:41 +0000 (16:07 -0800)
committerBardia Keyoumarsi <bardia.keyoumarsi@cohodata.com>
Mon, 7 Dec 2015 05:31:33 +0000 (21:31 -0800)
This patch introduces Coho Data volume driver along with unit tests.

Implements: blueprint coho-cinder-driver

DocImpact
Documentation for setting up the Coho driver and enabling it
in Cinder will be provided in a separate patch.

Change-Id: I06a66d10add9132d0f3afca054d68094ddfb4da0
Signed-off-by: Bardia Keyoumarsi <bardia.keyoumarsi@cohodata.com>
cinder/exception.py
cinder/opts.py
cinder/tests/unit/test_coho.py [new file with mode: 0644]
cinder/volume/drivers/coho.py [new file with mode: 0644]
releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml [new file with mode: 0644]
tests-py3.txt

index ef50bd67bc3b97c6222cf6740d46bc8c2f16a9bd..b534453759102b64feaf1fa5e43f9cca8e1e2c98 100644 (file)
@@ -48,6 +48,7 @@ CONF.register_opts(exc_log_opts)
 
 
 class ConvertedException(webob.exc.WSGIHTTPException):
+
     def __init__(self, code=500, title="", explanation=""):
         self.code = code
         # There is a strict rule about constructing status line for HTTP:
@@ -1017,3 +1018,8 @@ class NotSupportedOperation(Invalid):
 # Hitachi HNAS drivers
 class HNASConnError(CinderException):
     message = _("%(message)s")
+
+
+# Coho drivers
+class CohoException(VolumeDriverException):
+    message = _("Coho Data Cinder driver failure: %(message)s")
index 8d65f80314b1a74e8f9adb91b8127d47e52ecffc..a4864e8f733103468b7ce6678378e6ec154c41a5 100644 (file)
@@ -64,6 +64,7 @@ from cinder.volume.drivers import blockbridge as \
     cinder_volume_drivers_blockbridge
 from cinder.volume.drivers.cloudbyte import options as \
     cinder_volume_drivers_cloudbyte_options
+from cinder.volume.drivers import coho as cinder_volume_drivers_coho
 from cinder.volume.drivers import datera as cinder_volume_drivers_datera
 from cinder.volume.drivers.dell import dell_storagecenter_common as \
     cinder_volume_drivers_dell_dellstoragecentercommon
@@ -238,6 +239,7 @@ def list_opts():
                 cinder_db_api.db_opts,
                 cinder_scheduler_weights_volumenumber.
                 volume_number_weight_opts,
+                cinder_volume_drivers_coho.coho_opts,
                 cinder_volume_drivers_xio.XIO_OPTS,
                 cinder_volume_drivers_zfssa_zfssaiscsi.ZFSSA_OPTS,
                 cinder_volume_driver.volume_opts,
diff --git a/cinder/tests/unit/test_coho.py b/cinder/tests/unit/test_coho.py
new file mode 100644 (file)
index 0000000..a0c724d
--- /dev/null
@@ -0,0 +1,376 @@
+# Copyright (c) 2015 Coho Data, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+import binascii
+import errno
+import mock
+import os
+import six
+import socket
+import xdrlib
+
+from cinder import exception
+from cinder import test
+from cinder.volume import configuration as conf
+from cinder.volume.drivers import coho
+from cinder.volume.drivers import nfs
+
+ADDR = 'coho-datastream-addr'
+PATH = '/test/path'
+RPC_PORT = 2049
+LOCAL_PATH = '/opt/cinder/mnt/test/path'
+
+VOLUME = {
+    'name': 'volume-bcc48c61-9691-4e5f-897c-793686093190',
+    'volume_id': 'bcc48c61-9691-4e5f-897c-793686093190',
+    'size': 128,
+    'volume_type': 'silver',
+    'volume_type_id': 'test',
+    'metadata': [{'key': 'type',
+                  'service_label': 'silver'}],
+    'provider_location': None,
+    'id': 'bcc48c61-9691-4e5f-897c-793686093190',
+    'status': 'available',
+}
+
+CLONE_VOL = VOLUME.copy()
+CLONE_VOL['size'] = 256
+
+SNAPSHOT = {
+    'name': 'snapshot-51dd4-8d8a-4aa9-9176-086c9d89e7fc',
+    'id': '51dd4-8d8a-4aa9-9176-086c9d89e7fc',
+    'size': 128,
+    'volume_type': None,
+    'provider_location': None,
+    'volume_size': 128,
+    'volume_name': 'volume-bcc48c61-9691-4e5f-897c-793686093190',
+    'volume_id': 'bcc48c61-9691-4e5f-897c-793686093191',
+}
+
+INVALID_SNAPSHOT = SNAPSHOT.copy()
+INVALID_SNAPSHOT['name'] = ''
+
+INVALID_HEADER_BIN = binascii.unhexlify('800000')
+NO_REPLY_BIN = binascii.unhexlify(
+    'aaaaa01000000010000000000000000000000003')
+MSG_DENIED_BIN = binascii.unhexlify(
+    '00000a010000000110000000000000000000000000000003')
+PROC_UNAVAIL_BIN = binascii.unhexlify(
+    '00000a010000000100000000000000000000000000000003')
+PROG_UNAVAIL_BIN = binascii.unhexlify(
+    '000003c70000000100000000000000000000000000000001')
+PROG_MISMATCH_BIN = binascii.unhexlify(
+    '00000f7700000001000000000000000000000000000000020000000100000001')
+GARBAGE_ARGS_BIN = binascii.unhexlify(
+    '00000d6e0000000100000000000000000000000000000004')
+
+
+class CohoDriverTest(test.TestCase):
+    """Test Coho Data's NFS volume driver."""
+
+    def __init__(self, *args, **kwargs):
+        super(CohoDriverTest, self).__init__(*args, **kwargs)
+
+    def setUp(self):
+        super(CohoDriverTest, self).setUp()
+
+        self.context = mock.Mock()
+        self.configuration = mock.Mock(spec=conf.Configuration)
+        self.configuration.max_over_subscription_ratio = 20.0
+        self.configuration.reserved_percentage = 0
+        self.configuration.volume_backend_name = 'coho-1'
+        self.configuration.coho_rpc_port = 2049
+        self.configuration.nfs_shares_config = '/etc/cinder/coho_shares'
+        self.configuration.nfs_sparsed_volumes = True
+        self.configuration.nfs_mount_point_base = '/opt/stack/cinder/mnt'
+        self.configuration.nfs_mount_options = None
+        self.configuration.nas_ip = None
+        self.configuration.nas_share_path = None
+        self.configuration.nas_mount_options = None
+        self.configuration.nfs_used_ratio = .95
+        self.configuration.nfs_oversub_ratio = 1.0
+
+    def test_setup_failure_when_rpc_port_unconfigured(self):
+        self.configuration.coho_rpc_port = None
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        self.mock_object(coho, 'LOG')
+        self.mock_object(nfs.NfsDriver, 'do_setup')
+
+        with self.assertRaisesRegex(exception.CohoException,
+                                    ".*Coho rpc port is not configured.*"):
+            drv.do_setup(self.context)
+
+        self.assertTrue(coho.LOG.warning.called)
+        self.assertTrue(nfs.NfsDriver.do_setup.called)
+
+    def test_setup_failure_when_coho_rpc_port_is_invalid(self):
+        self.configuration.coho_rpc_port = 99999
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        self.mock_object(coho, 'LOG')
+        self.mock_object(nfs.NfsDriver, 'do_setup')
+
+        with self.assertRaisesRegex(exception.CohoException,
+                                    "Invalid port number.*"):
+            drv.do_setup(self.context)
+
+        self.assertTrue(coho.LOG.warning.called)
+        self.assertTrue(nfs.NfsDriver.do_setup.called)
+
+    def test_create_snapshot(self):
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
+        mock_get_volume_location = self.mock_object(coho.CohoDriver,
+                                                    '_get_volume_location')
+        mock_get_volume_location.return_value = ADDR, PATH
+
+        drv.create_snapshot(SNAPSHOT)
+
+        mock_get_volume_location.assert_has_calls(
+            [mock.call(SNAPSHOT['volume_id'])])
+        mock_rpc_client.assert_has_calls(
+            [mock.call(ADDR, self.configuration.coho_rpc_port),
+             mock.call().create_snapshot(
+                os.path.join(PATH, SNAPSHOT['volume_name']),
+                SNAPSHOT['name'], 0)])
+
+    def test_delete_snapshot(self):
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
+        mock_get_volume_location = self.mock_object(coho.CohoDriver,
+                                                    '_get_volume_location')
+        mock_get_volume_location.return_value = ADDR, PATH
+
+        drv.delete_snapshot(SNAPSHOT)
+
+        mock_get_volume_location.assert_has_calls(
+            [mock.call(SNAPSHOT['volume_id'])])
+        mock_rpc_client.assert_has_calls(
+            [mock.call(ADDR, self.configuration.coho_rpc_port),
+             mock.call().delete_snapshot(SNAPSHOT['name'])])
+
+    def test_create_volume_from_snapshot(self):
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
+        mock_find_share = self.mock_object(drv, '_find_share')
+        mock_find_share.return_value = ADDR + ':' + PATH
+
+        drv.create_volume_from_snapshot(VOLUME, SNAPSHOT)
+
+        mock_find_share.assert_has_calls(
+            [mock.call(VOLUME['size'])])
+        mock_rpc_client.assert_has_calls(
+            [mock.call(ADDR, self.configuration.coho_rpc_port),
+             mock.call().create_volume_from_snapshot(
+                SNAPSHOT['name'], os.path.join(PATH, VOLUME['name']))])
+
+    def test_create_cloned_volume(self):
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        mock_find_share = self.mock_object(drv, '_find_share')
+        mock_find_share.return_value = ADDR + ':' + PATH
+        mock_execute = self.mock_object(drv, '_execute')
+        mock_local_path = self.mock_object(drv, 'local_path')
+        mock_local_path.return_value = LOCAL_PATH
+
+        drv.create_cloned_volume(VOLUME, CLONE_VOL)
+
+        mock_find_share.assert_has_calls(
+            [mock.call(VOLUME['size'])])
+        mock_local_path.assert_has_calls(
+            [mock.call(VOLUME), mock.call(CLONE_VOL)])
+        mock_execute.assert_has_calls(
+            [mock.call('cp', LOCAL_PATH, LOCAL_PATH, run_as_root=True)])
+
+    def test_extend_volume(self):
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        mock_execute = self.mock_object(drv, '_execute')
+        mock_local_path = self.mock_object(drv, 'local_path')
+        mock_local_path.return_value = LOCAL_PATH
+
+        drv.extend_volume(VOLUME, 512)
+
+        mock_local_path.assert_has_calls(
+            [mock.call(VOLUME)])
+        mock_execute.assert_has_calls(
+            [mock.call('truncate', '-s', '512G',
+                       LOCAL_PATH, run_as_root=True)])
+
+    def test_snapshot_failure_when_source_does_not_exist(self):
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        self.mock_object(coho.Client, '_make_call')
+        mock_init_socket = self.mock_object(coho.Client, 'init_socket')
+        mock_unpack_uint = self.mock_object(xdrlib.Unpacker, 'unpack_uint')
+        mock_unpack_uint.return_value = errno.ENOENT
+        mock_get_volume_location = self.mock_object(coho.CohoDriver,
+                                                    '_get_volume_location')
+        mock_get_volume_location.return_value = ADDR, PATH
+
+        with self.assertRaisesRegex(exception.CohoException,
+                                    "No such file or directory.*"):
+            drv.create_snapshot(SNAPSHOT)
+
+        self.assertTrue(mock_init_socket.called)
+        self.assertTrue(mock_unpack_uint.called)
+        mock_get_volume_location.assert_has_calls(
+            [mock.call(SNAPSHOT['volume_id'])])
+
+    def test_snapshot_failure_with_invalid_input(self):
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        self.mock_object(coho.Client, '_make_call')
+        mock_init_socket = self.mock_object(coho.Client, 'init_socket')
+        mock_unpack_uint = self.mock_object(xdrlib.Unpacker, 'unpack_uint')
+        mock_unpack_uint.return_value = errno.EINVAL
+        mock_get_volume_location = self.mock_object(coho.CohoDriver,
+                                                    '_get_volume_location')
+        mock_get_volume_location.return_value = ADDR, PATH
+
+        with self.assertRaisesRegex(exception.CohoException,
+                                    "Invalid argument"):
+            drv.delete_snapshot(INVALID_SNAPSHOT)
+
+        self.assertTrue(mock_init_socket.called)
+        self.assertTrue(mock_unpack_uint.called)
+        mock_get_volume_location.assert_has_calls(
+            [mock.call(INVALID_SNAPSHOT['volume_id'])])
+
+    def test_snapshot_failure_when_remote_is_unreachable(self):
+        drv = coho.CohoDriver(configuration=self.configuration)
+
+        mock_get_volume_location = self.mock_object(coho.CohoDriver,
+                                                    '_get_volume_location')
+        mock_get_volume_location.return_value = 'uknown-address', PATH
+
+        with self.assertRaisesRegex(exception.CohoException,
+                                    "Failed to establish connection.*"):
+            drv.create_snapshot(SNAPSHOT)
+
+        mock_get_volume_location.assert_has_calls(
+            [mock.call(INVALID_SNAPSHOT['volume_id'])])
+
+    def test_rpc_client_make_call_proper_order(self):
+        """This test ensures that the RPC client logic is correct.
+
+        When the RPC client's make_call function is called it creates
+        a packet and sends it to the Coho cluster RPC server. This test
+        ensures that the functions needed to complete the process are
+        called in the proper order with valid arguments.
+        """
+
+        mock_packer = self.mock_object(xdrlib, 'Packer')
+        mock_unpacker = self.mock_object(xdrlib, 'Unpacker')
+        mock_unpacker.return_value.unpack_uint.return_value = 0
+        mock_socket = self.mock_object(socket, 'socket')
+        mock_init_call = self.mock_object(coho.Client, 'init_call')
+        mock_init_call.return_value = (1, 2)
+        mock_sendrecord = self.mock_object(coho.Client, '_sendrecord')
+        mock_recvrecord = self.mock_object(coho.Client, '_recvrecord')
+        mock_recvrecord.return_value = 'test_reply'
+        mock_unpack_replyheader = self.mock_object(coho.Client,
+                                                   'unpack_replyheader')
+        mock_unpack_replyheader.return_value = (123, 1)
+
+        rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
+        rpc_client.create_volume_from_snapshot('src', 'dest')
+
+        self.assertTrue(mock_sendrecord.called)
+        self.assertTrue(mock_unpack_replyheader.called)
+        mock_packer.assert_has_calls([mock.call().reset()])
+        mock_unpacker.assert_has_calls(
+            [mock.call().reset('test_reply'),
+             mock.call().unpack_uint()])
+        mock_socket.assert_has_calls(
+            [mock.call(socket.AF_INET, socket.SOCK_STREAM),
+             mock.call().bind(('', 0)),
+             mock.call().connect((ADDR, RPC_PORT))])
+        mock_init_call.assert_has_calls(
+            [mock.call(coho.COHO1_CREATE_VOLUME_FROM_SNAPSHOT,
+                       [(six.b('src'), mock_packer().pack_string),
+                        (six.b('dest'), mock_packer().pack_string)])])
+
+    def test_rpc_client_error_in_reply_header(self):
+        """Ensure excpetions in reply header are raised by the RPC client.
+
+        Coho cluster's RPC server packs errors into the reply header.
+        This test ensures that the RPC client parses the reply header
+        correctly and raises exceptions on various errors that can be
+        included in the reply header.
+        """
+        mock_socket = self.mock_object(socket, 'socket')
+        mock_recvrecord = self.mock_object(coho.Client, '_recvrecord')
+        rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
+
+        mock_recvrecord.return_value = NO_REPLY_BIN
+        with self.assertRaisesRegex(exception.CohoException,
+                                    "no REPLY.*"):
+            rpc_client.create_snapshot('src', 'dest', 0)
+
+        mock_recvrecord.return_value = MSG_DENIED_BIN
+        with self.assertRaisesRegex(exception.CohoException,
+                                    ".*MSG_DENIED.*"):
+            rpc_client.delete_snapshot('snapshot')
+
+        mock_recvrecord.return_value = PROG_UNAVAIL_BIN
+        with self.assertRaisesRegex(exception.CohoException,
+                                    ".*PROG_UNAVAIL"):
+            rpc_client.delete_snapshot('snapshot')
+
+        mock_recvrecord.return_value = PROG_MISMATCH_BIN
+        with self.assertRaisesRegex(exception.CohoException,
+                                    ".*PROG_MISMATCH.*"):
+            rpc_client.delete_snapshot('snapshot')
+
+        mock_recvrecord.return_value = GARBAGE_ARGS_BIN
+        with self.assertRaisesRegex(exception.CohoException,
+                                    ".*GARBAGE_ARGS"):
+            rpc_client.delete_snapshot('snapshot')
+
+        mock_recvrecord.return_value = PROC_UNAVAIL_BIN
+        with self.assertRaisesRegex(exception.CohoException,
+                                    ".*PROC_UNAVAIL"):
+            rpc_client.delete_snapshot('snapshot')
+
+        self.assertTrue(mock_recvrecord.called)
+        mock_socket.assert_has_calls(
+            [mock.call(socket.AF_INET, socket.SOCK_STREAM),
+             mock.call().bind(('', 0)),
+             mock.call().connect((ADDR, RPC_PORT))])
+
+    def test_rpc_client_error_in_receive_fragment(self):
+        """Ensure exception is raised when malformed packet is recieved."""
+
+        mock_sendrcd = self.mock_object(coho.Client, '_sendrecord')
+        mock_socket = self.mock_object(socket, 'socket')
+        mock_socket.return_value.recv.return_value = INVALID_HEADER_BIN
+        rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
+
+        with self.assertRaisesRegex(exception.CohoException,
+                                    "Invalid response header.*"):
+            rpc_client.create_snapshot('src', 'dest', 0)
+
+        self.assertTrue(mock_sendrcd.called)
+        mock_socket.assert_has_calls(
+            [mock.call(socket.AF_INET, socket.SOCK_STREAM),
+             mock.call().bind(('', 0)),
+             mock.call().connect((ADDR, RPC_PORT)),
+             mock.call().recv(4)])
diff --git a/cinder/volume/drivers/coho.py b/cinder/volume/drivers/coho.py
new file mode 100644 (file)
index 0000000..0b2c3f8
--- /dev/null
@@ -0,0 +1,397 @@
+# Copyright (c) 2015 Coho Data, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import os
+import six
+import socket
+import xdrlib
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from random import randint
+
+from cinder import exception
+from cinder.i18n import _
+from cinder.volume.drivers import nfs
+
+#
+# RPC Definition
+#
+
+RPCVERSION = 2
+
+CALL = 0
+REPLY = 1
+
+AUTH_NULL = 0
+
+MSG_ACCEPTED = 0
+MSG_DENIED = 1
+
+SUCCESS = 0
+PROG_UNAVAIL = 1
+PROG_MISMATCH = 2
+PROC_UNAVAIL = 3
+GARBAGE_ARGS = 4
+
+RPC_MISMATCH = 0
+AUTH_ERROR = 1
+
+COHO_PROGRAM = 400115
+COHO_V1 = 1
+COHO1_CREATE_SNAPSHOT = 1
+COHO1_DELETE_SNAPSHOT = 2
+COHO1_CREATE_VOLUME_FROM_SNAPSHOT = 3
+
+#
+# Simple RPC Client
+#
+
+
+def make_auth_null():
+
+    return six.b('')
+
+
+class Client(object):
+
+    def __init__(self, address, prog, vers, port):
+        self.packer = xdrlib.Packer()
+        self.unpacker = xdrlib.Unpacker('')
+        self.address = address
+        self.prog = prog
+        self.vers = vers
+        self.port = port
+        self.cred = None
+        self.verf = None
+
+        self.init_socket()
+        self.init_xid()
+
+    def init_socket(self):
+        try:
+            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.sock.bind(('', 0))
+            self.sock.connect((self.address, self.port))
+        except socket.error:
+            msg = _('Failed to establish connection with Coho cluster')
+            raise exception.CohoException(msg)
+
+    def init_xid(self):
+        self.xid = randint(0, 4096)
+
+    def make_xid(self):
+        self.xid += 1
+
+    def make_cred(self):
+        if self.cred is None:
+            self.cred = (AUTH_NULL, make_auth_null())
+        return self.cred
+
+    def make_verf(self):
+        if self.verf is None:
+            self.verf = (AUTH_NULL, make_auth_null())
+        return self.verf
+
+    def pack_auth(self, auth):
+        flavor, stuff = auth
+        self.packer.pack_enum(flavor)
+        self.packer.pack_opaque(stuff)
+
+    def pack_callheader(self, xid, prog, vers, proc, cred, verf):
+        self.packer.pack_uint(xid)
+        self.packer.pack_enum(CALL)
+        self.packer.pack_uint(RPCVERSION)
+        self.packer.pack_uint(prog)
+        self.packer.pack_uint(vers)
+        self.packer.pack_uint(proc)
+        self.pack_auth(cred)
+        self.pack_auth(verf)
+
+    def unpack_auth(self):
+        flavor = self.unpacker.unpack_enum()
+        stuff = self.unpacker.unpack_opaque()
+        return (flavor, stuff)
+
+    def unpack_replyheader(self):
+        xid = self.unpacker.unpack_uint()
+        mtype = self.unpacker.unpack_enum()
+        if mtype != REPLY:
+            raise exception.CohoException(
+                _('no REPLY but %r') % (mtype,))
+        stat = self.unpacker.unpack_enum()
+        if stat == MSG_DENIED:
+            stat = self.unpacker.unpack_enum()
+            if stat == RPC_MISMATCH:
+                low = self.unpacker.unpack_uint()
+                high = self.unpacker.unpack_uint()
+                raise exception.CohoException(
+                    _('MSG_DENIED: RPC_MISMATCH: %r') % ((low, high),))
+            if stat == AUTH_ERROR:
+                stat = self.unpacker.unpack_uint()
+                raise exception.CohoException(
+                    _('MSG_DENIED: AUTH_ERROR: %r') % (stat,))
+            raise exception.CohoException(_('MSG_DENIED: %r') % (stat,))
+        if stat != MSG_ACCEPTED:
+            raise exception.CohoException(
+                _('Neither MSG_DENIED nor MSG_ACCEPTED: %r') % (stat,))
+        verf = self.unpack_auth()
+        stat = self.unpacker.unpack_enum()
+        if stat == PROG_UNAVAIL:
+            raise exception.CohoException(_('call failed: PROG_UNAVAIL'))
+        if stat == PROG_MISMATCH:
+            low = self.unpacker.unpack_uint()
+            high = self.unpacker.unpack_uint()
+            raise exception.CohoException(
+                _('call failed: PROG_MISMATCH: %r') % ((low, high),))
+        if stat == PROC_UNAVAIL:
+            raise exception.CohoException(_('call failed: PROC_UNAVAIL'))
+        if stat == GARBAGE_ARGS:
+            raise exception.CohoException(_('call failed: GARBAGE_ARGS'))
+        if stat != SUCCESS:
+            raise exception.CohoException(_('call failed: %r') % (stat,))
+        return xid, verf
+
+    def init_call(self, proc, args):
+        self.make_xid()
+        self.packer.reset()
+        cred = self.make_cred()
+        verf = self.make_verf()
+        self.pack_callheader(self.xid, self.prog, self.vers, proc, cred, verf)
+
+        for arg, func in args:
+            func(arg)
+
+        return self.xid, self.packer.get_buf()
+
+    def _sendfrag(self, last, frag):
+        x = len(frag)
+        if last:
+            x = x | 0x80000000
+        header = (six.int2byte(int(x >> 24 & 0xff)) +
+                  six.int2byte(int(x >> 16 & 0xff)) +
+                  six.int2byte(int(x >> 8 & 0xff)) +
+                  six.int2byte(int(x & 0xff)))
+        self.sock.send(header + frag)
+
+    def _sendrecord(self, record):
+        self._sendfrag(1, record)
+
+    def _recvfrag(self):
+        header = self.sock.recv(4)
+        if len(header) < 4:
+            raise exception.CohoException(
+                _('Invalid response header from RPC server'))
+        x = (six.indexbytes(header, 0) << 24 |
+             six.indexbytes(header, 1) << 16 |
+             six.indexbytes(header, 2) << 8 |
+             six.indexbytes(header, 3))
+        last = ((x & 0x80000000) != 0)
+        n = int(x & 0x7fffffff)
+        frag = six.b('')
+        while n > 0:
+            buf = self.sock.recv(n)
+            if not buf:
+                raise exception.CohoException(
+                    _('RPC server response is incomplete'))
+            n = n - len(buf)
+            frag = frag + buf
+        return last, frag
+
+    def _recvrecord(self):
+        record = six.b('')
+        last = 0
+        while not last:
+            last, frag = self._recvfrag()
+            record = record + frag
+        return record
+
+    def _make_call(self, proc, args):
+        self.packer.reset()
+        xid, call = self.init_call(proc, args)
+        self._sendrecord(call)
+        reply = self._recvrecord()
+        self.unpacker.reset(reply)
+        xid, verf = self.unpack_replyheader()
+
+    def _call(self, proc, args):
+        self._make_call(proc, args)
+        res = self.unpacker.unpack_uint()
+        if res != SUCCESS:
+            raise exception.CohoException(os.strerror(res))
+
+
+class CohoRPCClient(Client):
+
+    def __init__(self, address, port):
+        Client.__init__(self, address, COHO_PROGRAM, 1, port)
+
+    def create_snapshot(self, src, dst, flags):
+        self._call(COHO1_CREATE_SNAPSHOT,
+                   [(six.b(src), self.packer.pack_string),
+                       (six.b(dst), self.packer.pack_string),
+                       (flags, self.packer.pack_uint)])
+
+    def delete_snapshot(self, name):
+        self._call(COHO1_DELETE_SNAPSHOT,
+                   [(six.b(name), self.packer.pack_string)])
+
+    def create_volume_from_snapshot(self, src, dst):
+        self._call(COHO1_CREATE_VOLUME_FROM_SNAPSHOT,
+                   [(six.b(src), self.packer.pack_string),
+                       (six.b(dst), self.packer.pack_string)])
+
+
+#
+# Coho Data Volume Driver
+#
+
+VERSION = '1.0.0'
+
+LOG = logging.getLogger(__name__)
+
+coho_opts = [
+    cfg.IntOpt('coho_rpc_port',
+               default=2049,
+               help='RPC port to connect to Coha Data MicroArray')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(coho_opts)
+
+
+class CohoDriver(nfs.NfsDriver):
+    """Coho Data NFS based cinder driver.
+
+    Creates file on NFS share for using it as block device on hypervisor.
+    Version history:
+    1.0.0 - Initial driver
+    """
+
+    # We have to overload this attribute of RemoteFSDriver because
+    # unfortunately the base method doesn't accept exports of the form:
+    # <address>:/
+    # It expects a non blank export name following the /.
+    # We are more permissive.
+    SHARE_FORMAT_REGEX = r'.+:/.*'
+
+    def __init__(self, *args, **kwargs):
+        super(CohoDriver, self).__init__(*args, **kwargs)
+        self.configuration.append_config_values(coho_opts)
+        self._execute_as_root = True
+        self._rpcclients = dict()
+        self._backend_name = (self.configuration.volume_backend_name or
+                              self.__class__.__name__)
+
+    def _init_rpcclient(self, addr, port):
+        client = CohoRPCClient(addr, port)
+        self._rpcclients[(addr, port)] = client
+        return client
+
+    def _get_rpcclient(self, addr, port):
+        if (addr, port) in self._rpcclients:
+            return self._rpcclients[(addr, port)]
+        return self._init_rpcclient(addr, port)
+
+    def do_setup(self, context):
+        """Any initialization the volume driver does while starting."""
+        super(CohoDriver, self).do_setup(context)
+        self._context = context
+
+        config = self.configuration.coho_rpc_port
+        if not config:
+            msg = _("Coho rpc port is not configured")
+            LOG.warning(msg)
+            raise exception.CohoException(msg)
+        if config < 1 or config > 65535:
+            msg = (_("Invalid port number %(config)s for Coho rpc port") %
+                   {'config': config})
+            LOG.warning(msg)
+            raise exception.CohoException(msg)
+
+    def _do_clone_volume(self, volume, src):
+        """Clone volume to source.
+
+        Create a volume on given remote share with the same contents
+        as the specified source.
+        """
+        volume_path = self.local_path(volume)
+        source_path = self.local_path(src)
+
+        self._execute('cp', source_path, volume_path,
+                      run_as_root=self._execute_as_root)
+
+    def _get_volume_location(self, volume_id):
+        """Returns provider location for given volume."""
+
+        # The driver should not directly access db, but since volume is not
+        # passed in create_snapshot and delete_snapshot we are forced to read
+        # the volume info from the database
+        volume = self.db.volume_get(self._context, volume_id)
+        addr, path = volume.provider_location.split(":")
+        return addr, path
+
+    def create_snapshot(self, snapshot):
+        """Create a volume snapshot."""
+        addr, path = self._get_volume_location(snapshot['volume_id'])
+        volume_path = os.path.join(path, snapshot['volume_name'])
+        snapshot_name = snapshot['name']
+        flags = 0  # unused at this time
+        client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
+        client.create_snapshot(volume_path, snapshot_name, flags)
+
+    def delete_snapshot(self, snapshot):
+        """Delete a volume snapshot."""
+        addr, path = self._get_volume_location(snapshot['volume_id'])
+        snapshot_name = snapshot['name']
+        client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
+        client.delete_snapshot(snapshot_name)
+
+    def create_volume_from_snapshot(self, volume, snapshot):
+        """Create a volume from a snapshot."""
+        volume['provider_location'] = self._find_share(volume['size'])
+        addr, path = volume['provider_location'].split(":")
+        volume_path = os.path.join(path, volume['name'])
+        snapshot_name = snapshot['name']
+        client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
+        client.create_volume_from_snapshot(snapshot_name, volume_path)
+
+        return {'provider_location': volume['provider_location']}
+
+    def _extend_file_sparse(self, path, size):
+        """Extend the size of a file (with no additional disk usage)."""
+        self._execute('truncate', '-s', '%sG' % size,
+                      path, run_as_root=self._execute_as_root)
+
+    def create_cloned_volume(self, volume, src_vref):
+        volume['provider_location'] = self._find_share(volume['size'])
+
+        self._do_clone_volume(volume, src_vref)
+
+    def extend_volume(self, volume, new_size):
+        """Extend the specified file to the new_size (sparsely)."""
+        volume_path = self.local_path(volume)
+
+        self._extend_file_sparse(volume_path, new_size)
+
+    def get_volume_stats(self, refresh):
+        """Pass in Coho Data information in volume stats."""
+        _stats = super(CohoDriver, self).get_volume_stats(refresh)
+        _stats["vendor_name"] = 'Coho Data'
+        _stats["driver_version"] = VERSION
+        _stats["storage_protocol"] = 'NFS'
+        _stats["volume_backend_name"] = self._backend_name
+
+        return _stats
diff --git a/releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml b/releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml
new file mode 100644 (file)
index 0000000..4a35861
--- /dev/null
@@ -0,0 +1,3 @@
+---
+features:
+  - Added backend driver for Coho Data storage.
index 3305ce77f45929016c904883909c4c7b9d4485e2..1ebf728626df4bf8c2cefb16f0cf9e50217f991b 100644 (file)
@@ -48,6 +48,7 @@ cinder.tests.unit.test_block_device
 cinder.tests.unit.test_blockbridge
 cinder.tests.unit.test_cloudbyte
 cinder.tests.unit.test_cmd
+cinder.tests.unit.test_coho
 cinder.tests.unit.test_conf
 cinder.tests.unit.test_context
 cinder.tests.unit.test_db_api