]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
DRBD: Policy-based waiting for completion
authorPhilipp Marek <philipp.marek@linbit.com>
Mon, 7 Mar 2016 09:53:29 +0000 (10:53 +0100)
committerPhilipp Marek <philipp.marek@linbit.com>
Mon, 7 Mar 2016 09:53:29 +0000 (10:53 +0100)
The more nodes are in a cluster, the higher the chance that one or more
of them are not available. Waiting on all nodes will not work any more,
so give the user a way to define what conditions are acceptable (to the
management, etc.) to continue.

The upcoming DRBDmanage 1.0 release will have a (sample) policy plugin,
so make that available in Cinder, too.
Documentation is at
    http://drbd.linbit.com/users-guide-9.0/s-drbdmanage-deployment-policy.html

DocImpact
Change-Id: Ib2bc888ca5df87a4de97ee07601d0280a04c7a81

cinder/tests/unit/test_drbdmanagedrv.py
cinder/volume/drivers/drbdmanagedrv.py

index 3ad6399953421b2804ea2dcb24c1fe574597659c..f922b6f4c434969e4bc29f4ad9a56b38d06e23fa 100644 (file)
 #    under the License.
 
 import collections
+import eventlet
 import six
 import sys
+import time
 
 import mock
 from oslo_utils import importutils
@@ -60,18 +62,22 @@ class mock_dm_consts(object):
     FLAG_UPD_CONFIG = "upd_config"
     FLAG_STANDBY = "standby"
     FLAG_QIGNORE = "qignore"
+    FLAG_REMOVE = "remove"
 
     AUX_PROP_PREFIX = "aux:"
 
     BOOL_TRUE = "true"
-    BOOL_FALSE = "true"
+    BOOL_FALSE = "false"
+
+    VOL_ID = "vol_id"
 
 
 class mock_dm_exc(object):
 
     DM_SUCCESS = 0
-    DM_EEXIST = 1
-    DM_ENOENT = 2
+    DM_INFO = 1
+    DM_EEXIST = 101
+    DM_ENOENT = 102
     DM_ERROR = 1000
 
 
@@ -138,6 +144,33 @@ class DrbdManageFakeDriver(object):
     def __init__(self):
         self.calls = []
 
+    def run_external_plugin(self, name, props):
+        self.calls.append(["run_external_plugin", name, props])
+
+        call_okay = [[mock_dm_exc.DM_SUCCESS, "ACK", []]]
+        not_done_yet = (call_okay,
+                        dict(timeout=mock_dm_consts.BOOL_FALSE,
+                             result=mock_dm_consts.BOOL_FALSE))
+        success = (call_okay,
+                   dict(timeout=mock_dm_consts.BOOL_FALSE,
+                        result=mock_dm_consts.BOOL_TRUE))
+        got_timeout = (call_okay,
+                       dict(timeout=mock_dm_consts.BOOL_TRUE,
+                            result=mock_dm_consts.BOOL_FALSE))
+
+        if "retry" not in props:
+            # Fake success, to not slow tests down
+            return success
+
+        if props["retry"] > 1:
+            props["retry"] -= 1
+            return not_done_yet
+
+        if props.get("run-into-timeout"):
+            return got_timeout
+
+        return success
+
     def list_resources(self, res, serial, prop, req):
         self.calls.append(["list_resources", res, prop, req])
         if ('aux:cinder-id' in prop and
@@ -154,7 +187,10 @@ class DrbdManageFakeDriver(object):
 
     def create_volume(self, res, size, props):
         self.calls.append(["create_volume", res, size, props])
-        return [[mock_dm_exc.DM_SUCCESS, "ack", []]]
+        return [[mock_dm_exc.DM_SUCCESS, "ack", []],
+                [mock_dm_exc.DM_INFO,
+                 "create_volume",
+                 [(mock_dm_consts.VOL_ID, '2')]]]
 
     def auto_deploy(self, res, red, delta, site_clients):
         self.calls.append(["auto_deploy", res, red, delta, site_clients])
@@ -233,6 +269,19 @@ class DrbdManageFakeDriver(object):
 
 class DrbdManageIscsiTestCase(test.TestCase):
 
+    def _fake_safe_get(self, key):
+        if key == 'iscsi_helper':
+            return 'fake'
+
+        if key.endswith('_policy'):
+            return '{}'
+
+        return None
+
+    @staticmethod
+    def _fake_sleep(amount):
+        pass
+
     def setUp(self):
         self.ctxt = context.get_admin_context()
         self._mock = mock.Mock()
@@ -254,7 +303,9 @@ class DrbdManageIscsiTestCase(test.TestCase):
                        '_wait_for_node_assignment',
                        self.fake_wait_node_assignment)
 
-        self.configuration.safe_get = lambda x: 'fake'
+        self.configuration.safe_get = self._fake_safe_get
+
+        self.stubs.Set(eventlet, 'sleep', self._fake_sleep)
 
     # Infrastructure
     def fake_import_object(self, what, configuration, db, executor):
@@ -294,7 +345,7 @@ class DrbdManageIscsiTestCase(test.TestCase):
         self.assertEqual("create_volume", dmd.odm.calls[2][0])
         self.assertEqual(1048576, dmd.odm.calls[2][2])
         self.assertEqual("auto_deploy", dmd.odm.calls[3][0])
-        self.assertEqual(len(dmd.odm.calls), 4)
+        self.assertEqual(5, len(dmd.odm.calls))
 
     def test_create_volume_controller_all_vols(self):
         testvol = {'project_id': 'testprjid',
@@ -308,13 +359,14 @@ class DrbdManageIscsiTestCase(test.TestCase):
         dmd.drbdmanage_devs_on_controller = True
         dmd.odm = DrbdManageFakeDriver()
         dmd.create_volume(testvol)
+        self.assertEqual(6, len(dmd.odm.calls))
         self.assertEqual("create_resource", dmd.odm.calls[0][0])
         self.assertEqual("list_volumes", dmd.odm.calls[1][0])
         self.assertEqual("create_volume", dmd.odm.calls[2][0])
         self.assertEqual(1048576, dmd.odm.calls[2][2])
         self.assertEqual("auto_deploy", dmd.odm.calls[3][0])
-        self.assertEqual("assign", dmd.odm.calls[4][0])
-        self.assertEqual(len(dmd.odm.calls), 5)
+        self.assertEqual("run_external_plugin", dmd.odm.calls[4][0])
+        self.assertEqual("assign", dmd.odm.calls[5][0])
 
     def test_delete_volume(self):
         testvol = {'project_id': 'testprjid',
@@ -400,11 +452,12 @@ class DrbdManageIscsiTestCase(test.TestCase):
         self.assertEqual("list_volumes", dmd.odm.calls[0][0])
         self.assertEqual("list_assignments", dmd.odm.calls[1][0])
         self.assertEqual("create_snapshot", dmd.odm.calls[2][0])
-        self.assertEqual("list_snapshots", dmd.odm.calls[3][0])
-        self.assertEqual("restore_snapshot", dmd.odm.calls[4][0])
-        self.assertEqual("list_snapshots", dmd.odm.calls[5][0])
-        self.assertEqual("remove_snapshot", dmd.odm.calls[6][0])
-        self.assertEqual("remove_snapshot", dmd.odm.calls[6][0])
+        self.assertEqual("run_external_plugin", dmd.odm.calls[3][0])
+        self.assertEqual("list_snapshots", dmd.odm.calls[4][0])
+        self.assertEqual("restore_snapshot", dmd.odm.calls[5][0])
+        self.assertEqual("run_external_plugin", dmd.odm.calls[6][0])
+        self.assertEqual("list_snapshots", dmd.odm.calls[7][0])
+        self.assertEqual("remove_snapshot", dmd.odm.calls[8][0])
 
 
 class DrbdManageDrbdTestCase(DrbdManageIscsiTestCase):
@@ -438,3 +491,58 @@ class DrbdManageDrbdTestCase(DrbdManageIscsiTestCase):
         self.assertEqual("list_volumes", dmd.odm.calls[3][0])
         self.assertEqual("text_query", dmd.odm.calls[4][0])
         self.assertEqual("local", x["driver_volume_type"])
+
+
+class DrbdManageCommonTestCase(DrbdManageIscsiTestCase):
+    def setUp(self):
+        super(DrbdManageCommonTestCase, self).setUp()
+
+    def test_drbd_policy_loop_timeout(self):
+        dmd = drv.DrbdManageDrbdDriver(configuration=self.configuration)
+        dmd.odm = DrbdManageFakeDriver()
+
+        res = dmd._call_policy_plugin('void', {},
+                                      {'retry': 4,
+                                       'run-into-timeout': True})
+        self.assertFalse(res)
+        self.assertEqual(4, len(dmd.odm.calls))
+        self.assertEqual("run_external_plugin", dmd.odm.calls[0][0])
+        self.assertEqual("run_external_plugin", dmd.odm.calls[1][0])
+        self.assertEqual("run_external_plugin", dmd.odm.calls[2][0])
+        self.assertEqual("run_external_plugin", dmd.odm.calls[3][0])
+
+    def test_drbd_policy_loop_success(self):
+        dmd = drv.DrbdManageDrbdDriver(configuration=self.configuration)
+        dmd.odm = DrbdManageFakeDriver()
+
+        res = dmd._call_policy_plugin('void',
+                                      {'base': 'data',
+                                       'retry': 4},
+                                      {'override': 'xyz'})
+        self.assertTrue(res)
+        self.assertEqual(4, len(dmd.odm.calls))
+        self.assertEqual("run_external_plugin", dmd.odm.calls[0][0])
+        self.assertEqual("run_external_plugin", dmd.odm.calls[1][0])
+        self.assertEqual("run_external_plugin", dmd.odm.calls[2][0])
+        self.assertEqual("run_external_plugin", dmd.odm.calls[3][0])
+
+    def test_drbd_policy_loop_simple(self):
+        dmd = drv.DrbdManageDrbdDriver(configuration=self.configuration)
+        dmd.odm = DrbdManageFakeDriver()
+
+        res = dmd._call_policy_plugin('policy-name',
+                                      {'base': "value",
+                                       'over': "ignore"},
+                                      {'over': "ride",
+                                       'starttime': 0})
+        self.assertTrue(res)
+
+        self.assertEqual(1, len(dmd.odm.calls))
+        self.assertEqual("run_external_plugin", dmd.odm.calls[0][0])
+        self.assertEqual('policy-name', dmd.odm.calls[0][1])
+
+        incoming = dmd.odm.calls[0][2]
+        self.assertGreaterEqual(4, abs(float(incoming['starttime']) -
+                                       time.time()))
+        self.assertEqual('value', incoming['base'])
+        self.assertEqual('ride', incoming['over'])
index 1ba6985b3b25b6c71234f4657c3903a830a7cdc9..af9bc294dcf22b135c9b92eb6bb0cbc34dd1f3ca 100644 (file)
@@ -24,6 +24,7 @@ for more details.
 
 
 import eventlet
+import json
 import six
 import socket
 import time
@@ -58,6 +59,24 @@ drbd_opts = [
     cfg.IntOpt('drbdmanage_redundancy',
                default=1,
                help='Number of nodes that should replicate the data.'),
+    cfg.StrOpt('drbdmanage_resource_policy',
+               default='{"ratio": "0.51", "timeout": "60"}',
+               help='Resource deployment completion wait policy.'),
+    cfg.StrOpt('drbdmanage_snapshot_policy',
+               default='{"count": "1", "timeout": "60"}',
+               help='Snapshot completion wait policy.'),
+    cfg.StrOpt('drbdmanage_resize_policy',
+               default='{"timeout": "60"}',
+               help='Volume resize completion wait policy.'),
+    cfg.StrOpt('drbdmanage_resource_plugin',
+               default="drbdmanage.plugins.plugins.wait_for.WaitForResource",
+               help='Resource deployment completion wait plugin.'),
+    cfg.StrOpt('drbdmanage_snapshot_plugin',
+               default="drbdmanage.plugins.plugins.wait_for.WaitForSnapshot",
+               help='Snapshot completion wait plugin.'),
+    cfg.StrOpt('drbdmanage_resize_plugin',
+               default="drbdmanage.plugins.plugins.wait_for.WaitForVolumeSize",
+               help='Volume resize completion wait plugin.'),
     cfg.BoolOpt('drbdmanage_devs_on_controller',
                 default=True,
                 help='''If set, the c-vol node will receive a useable
@@ -115,6 +134,21 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
         self.backend_name = self.configuration.safe_get(
             'volume_backend_name') or 'drbdmanage'
 
+        js_decoder = json.JSONDecoder()
+        self.policy_resource = js_decoder.decode(
+            self.configuration.safe_get('drbdmanage_resource_policy'))
+        self.policy_snapshot = js_decoder.decode(
+            self.configuration.safe_get('drbdmanage_snapshot_policy'))
+        self.policy_resize = js_decoder.decode(
+            self.configuration.safe_get('drbdmanage_resize_policy'))
+
+        self.plugin_resource = self.configuration.safe_get(
+            'drbdmanage_resource_plugin')
+        self.plugin_snapshot = self.configuration.safe_get(
+            'drbdmanage_snapshot_plugin')
+        self.plugin_resize = self.configuration.safe_get(
+            'drbdmanage_resize_plugin')
+
         # needed as per pep8:
         #   F841 local variable 'CS_DEPLOYED' is assigned to but never used
         global CS_DEPLOYED, CS_DISKLESS, CS_UPD_CON
@@ -137,6 +171,32 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
             # Old function object is invalid, get new one.
             return getattr(self.odm, fn._method_name)(*args)
 
+    def _fetch_answer_data(self, res, key, level=None, req=True):
+        for code, fmt, data in res:
+            if code == dm_exc.DM_INFO:
+                if level and level != fmt:
+                    continue
+
+                value = [v for k, v in data if k == key]
+                if value:
+                    if len(value) == 1:
+                        return value[0]
+                    else:
+                        return value
+
+        if req:
+            if level:
+                l = level + ":" + key
+            else:
+                l = key
+
+            msg = _('DRBDmanage driver error: expected key "%s" '
+                    'not in answer, wrong DRBDmanage version?') % l
+            LOG.error(msg)
+            raise exception.VolumeDriverException(message=msg)
+
+        return None
+
     def do_setup(self, context):
         """Any initialization the volume driver does while starting."""
         super(DrbdManageBaseDriver, self).do_setup(context)
@@ -176,9 +236,8 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
             if code == dm_exc.DM_SUCCESS:
                 seen_success = True
                 continue
-            if hasattr(dm_exc, 'DM_INFO'):
-                if code == dm_exc.DM_INFO:
-                    continue
+            if code == dm_exc.DM_INFO:
+                continue
             seen_error = _("Received error string: %s") % (fmt % arg)
 
         if seen_error:
@@ -209,11 +268,37 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
         except ValueError:
             return None
 
+    def _call_policy_plugin(self, plugin, pol_base, pol_this):
+        """Returns True for done, False for timeout."""
+
+        pol_inp_data = dict(pol_base)
+        pol_inp_data.update(pol_this,
+                            starttime=str(time.time()))
+
+        retry = 0
+        while True:
+            res, pol_result = self.call_or_reconnect(
+                self.odm.run_external_plugin,
+                plugin,
+                pol_inp_data)
+            self._check_result(res)
+
+            if pol_result['result'] == dm_const.BOOL_TRUE:
+                return True
+
+            if pol_result['timeout'] == dm_const.BOOL_TRUE:
+                return False
+
+            eventlet.sleep(min(0.5 + retry / 5, 2))
+            retry += 1
+
     def _wait_for_node_assignment(self, res_name, vol_nr, nodenames,
                                   filter_props=None, timeout=90,
                                   check_vol_deployed=True):
         """Return True as soon as one assignment matches the filter."""
 
+        # TODO(LINBIT): unify with policy plugins
+
         if not filter_props:
             filter_props = self.empty_dict
 
@@ -394,6 +479,7 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
                                          self._vol_size_to_dm(volume['size']),
                                          props)
             self._check_result(res)
+            drbd_vol = self._fetch_answer_data(res, dm_const.VOL_ID)
 
         # If we crashed between create_volume and the deploy call,
         # the volume might be defined but not exist on any server. Oh my.
@@ -402,15 +488,23 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
                                      0, True)
         self._check_result(res)
 
-        # TODO(pm): CG
-        self._wait_for_node_assignment(d_res_name, 0, self.empty_list)
+        okay = self._call_policy_plugin(self.plugin_resource,
+                                        self.policy_resource,
+                                        dict(resource=d_res_name,
+                                             volnr=str(drbd_vol)))
+        if not okay:
+            message = (_('DRBDmanage timeout waiting for volume creation; '
+                         'resource "%(res)s", volume "%(vol)s"') %
+                       {'res': d_res_name, 'vol': volume['id']})
+            raise exception.VolumeBackendAPIException(data=message)
 
         if self.drbdmanage_devs_on_controller:
             # TODO(pm): CG
             res = self.call_or_reconnect(self.odm.assign,
                                          socket.gethostname(),
                                          d_res_name,
-                                         self.empty_dict)
+                                         [(dm_const.FLAG_DISKLESS,
+                                           dm_const.BOOL_TRUE)])
             self._check_result(res, ignore=[dm_exc.DM_EEXIST])
 
         return {}
@@ -430,10 +524,13 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
                                      d_res_name, d_vol_nr, False)
         self._check_result(res, ignore=[dm_exc.DM_ENOENT])
 
+        # Ask for volumes in that resource that are not scheduled for deletion.
         res, rl = self.call_or_reconnect(self.odm.list_volumes,
                                          [d_res_name],
                                          0,
-                                         self.empty_dict,
+                                         [(dm_const.TSTATE_PREFIX +
+                                           dm_const.FLAG_REMOVE,
+                                           dm_const.BOOL_FALSE)],
                                          self.empty_list)
         self._check_result(res)
 
@@ -465,7 +562,8 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
 
         r_props = self.empty_dict
         # TODO(PM): consistency groups => different volume number possible
-        v_props = [(0, self._priv_hash_from_volume(volume))]
+        new_vol_nr = 0
+        v_props = [(new_vol_nr, self._priv_hash_from_volume(volume))]
 
         res = self.call_or_reconnect(self.odm.restore_snapshot,
                                      new_res,
@@ -473,7 +571,19 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
                                      sname,
                                      r_props,
                                      v_props)
-        return self._check_result(res, ignore=[dm_exc.DM_ENOENT])
+        self._check_result(res, ignore=[dm_exc.DM_ENOENT])
+
+        # TODO(PM): CG
+        okay = self._call_policy_plugin(self.plugin_resource,
+                                        self.policy_resource,
+                                        dict(resource=new_res,
+                                             volnr=str(new_vol_nr)))
+        if not okay:
+            message = (_('DRBDmanage timeout waiting for new volume '
+                         'after snapshot restore; '
+                         'resource "%(res)s", volume "%(vol)s"') %
+                       {'res': new_res, 'vol': volume['id']})
+            raise exception.VolumeBackendAPIException(data=message)
 
     def create_cloned_volume(self, volume, src_vref):
         temp_id = self._clean_uuid()
@@ -540,7 +650,18 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
                                      self._vol_size_to_dm(new_size),
                                      0)
         self._check_result(res)
-        return 0
+
+        okay = self._call_policy_plugin(self.plugin_resize,
+                                        self.policy_resize,
+                                        dict(resource=d_res_name,
+                                             volnr=str(d_vol_nr),
+                                             req_size=str(new_size)))
+        if not okay:
+            message = (_('DRBDmanage timeout waiting for volume size; '
+                         'volume ID "%(id)s" (res "%(res)s", vnr %(vnr)d)') %
+                       {'id': volume['id'],
+                        'res': d_res_name, 'vnr': d_vol_nr})
+            raise exception.VolumeBackendAPIException(data=message)
 
     def create_snapshot(self, snapshot):
         """Creates a snapshot."""
@@ -553,7 +674,7 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
                                            self.empty_dict,
                                            [d_res_name],
                                            0,
-                                           self.empty_dict,
+                                           {CS_DISKLESS: dm_const.BOOL_FALSE},
                                            self.empty_list)
         self._check_result(res)
 
@@ -568,6 +689,16 @@ class DrbdManageBaseDriver(driver.VolumeDriver):
                                      d_res_name, sn_name, nodes, props)
         self._check_result(res)
 
+        okay = self._call_policy_plugin(self.plugin_snapshot,
+                                        self.policy_snapshot,
+                                        dict(resource=d_res_name,
+                                             snapshot=sn_name))
+        if not okay:
+            message = (_('DRBDmanage timeout waiting for snapshot creation; '
+                         'resource "%(res)s", snapshot "%(sn)s"') %
+                       {'res': d_res_name, 'sn': sn_name})
+            raise exception.VolumeBackendAPIException(data=message)
+
     def delete_snapshot(self, snapshot):
         """Deletes a snapshot."""