# under the License.
import collections
+import eventlet
import six
import sys
+import time
import mock
from oslo_utils import importutils
FLAG_UPD_CONFIG = "upd_config"
FLAG_STANDBY = "standby"
FLAG_QIGNORE = "qignore"
+ FLAG_REMOVE = "remove"
AUX_PROP_PREFIX = "aux:"
BOOL_TRUE = "true"
- BOOL_FALSE = "true"
+ BOOL_FALSE = "false"
+
+ VOL_ID = "vol_id"
class mock_dm_exc(object):
DM_SUCCESS = 0
- DM_EEXIST = 1
- DM_ENOENT = 2
+ DM_INFO = 1
+ DM_EEXIST = 101
+ DM_ENOENT = 102
DM_ERROR = 1000
def __init__(self):
self.calls = []
+ def run_external_plugin(self, name, props):
+ self.calls.append(["run_external_plugin", name, props])
+
+ call_okay = [[mock_dm_exc.DM_SUCCESS, "ACK", []]]
+ not_done_yet = (call_okay,
+ dict(timeout=mock_dm_consts.BOOL_FALSE,
+ result=mock_dm_consts.BOOL_FALSE))
+ success = (call_okay,
+ dict(timeout=mock_dm_consts.BOOL_FALSE,
+ result=mock_dm_consts.BOOL_TRUE))
+ got_timeout = (call_okay,
+ dict(timeout=mock_dm_consts.BOOL_TRUE,
+ result=mock_dm_consts.BOOL_FALSE))
+
+ if "retry" not in props:
+ # Fake success, to not slow tests down
+ return success
+
+ if props["retry"] > 1:
+ props["retry"] -= 1
+ return not_done_yet
+
+ if props.get("run-into-timeout"):
+ return got_timeout
+
+ return success
+
def list_resources(self, res, serial, prop, req):
self.calls.append(["list_resources", res, prop, req])
if ('aux:cinder-id' in prop and
def create_volume(self, res, size, props):
self.calls.append(["create_volume", res, size, props])
- return [[mock_dm_exc.DM_SUCCESS, "ack", []]]
+ return [[mock_dm_exc.DM_SUCCESS, "ack", []],
+ [mock_dm_exc.DM_INFO,
+ "create_volume",
+ [(mock_dm_consts.VOL_ID, '2')]]]
def auto_deploy(self, res, red, delta, site_clients):
self.calls.append(["auto_deploy", res, red, delta, site_clients])
class DrbdManageIscsiTestCase(test.TestCase):
+ def _fake_safe_get(self, key):
+ if key == 'iscsi_helper':
+ return 'fake'
+
+ if key.endswith('_policy'):
+ return '{}'
+
+ return None
+
+ @staticmethod
+ def _fake_sleep(amount):
+ pass
+
def setUp(self):
self.ctxt = context.get_admin_context()
self._mock = mock.Mock()
'_wait_for_node_assignment',
self.fake_wait_node_assignment)
- self.configuration.safe_get = lambda x: 'fake'
+ self.configuration.safe_get = self._fake_safe_get
+
+ self.stubs.Set(eventlet, 'sleep', self._fake_sleep)
# Infrastructure
def fake_import_object(self, what, configuration, db, executor):
self.assertEqual("create_volume", dmd.odm.calls[2][0])
self.assertEqual(1048576, dmd.odm.calls[2][2])
self.assertEqual("auto_deploy", dmd.odm.calls[3][0])
- self.assertEqual(len(dmd.odm.calls), 4)
+ self.assertEqual(5, len(dmd.odm.calls))
def test_create_volume_controller_all_vols(self):
testvol = {'project_id': 'testprjid',
dmd.drbdmanage_devs_on_controller = True
dmd.odm = DrbdManageFakeDriver()
dmd.create_volume(testvol)
+ self.assertEqual(6, len(dmd.odm.calls))
self.assertEqual("create_resource", dmd.odm.calls[0][0])
self.assertEqual("list_volumes", dmd.odm.calls[1][0])
self.assertEqual("create_volume", dmd.odm.calls[2][0])
self.assertEqual(1048576, dmd.odm.calls[2][2])
self.assertEqual("auto_deploy", dmd.odm.calls[3][0])
- self.assertEqual("assign", dmd.odm.calls[4][0])
- self.assertEqual(len(dmd.odm.calls), 5)
+ self.assertEqual("run_external_plugin", dmd.odm.calls[4][0])
+ self.assertEqual("assign", dmd.odm.calls[5][0])
def test_delete_volume(self):
testvol = {'project_id': 'testprjid',
self.assertEqual("list_volumes", dmd.odm.calls[0][0])
self.assertEqual("list_assignments", dmd.odm.calls[1][0])
self.assertEqual("create_snapshot", dmd.odm.calls[2][0])
- self.assertEqual("list_snapshots", dmd.odm.calls[3][0])
- self.assertEqual("restore_snapshot", dmd.odm.calls[4][0])
- self.assertEqual("list_snapshots", dmd.odm.calls[5][0])
- self.assertEqual("remove_snapshot", dmd.odm.calls[6][0])
- self.assertEqual("remove_snapshot", dmd.odm.calls[6][0])
+ self.assertEqual("run_external_plugin", dmd.odm.calls[3][0])
+ self.assertEqual("list_snapshots", dmd.odm.calls[4][0])
+ self.assertEqual("restore_snapshot", dmd.odm.calls[5][0])
+ self.assertEqual("run_external_plugin", dmd.odm.calls[6][0])
+ self.assertEqual("list_snapshots", dmd.odm.calls[7][0])
+ self.assertEqual("remove_snapshot", dmd.odm.calls[8][0])
class DrbdManageDrbdTestCase(DrbdManageIscsiTestCase):
self.assertEqual("list_volumes", dmd.odm.calls[3][0])
self.assertEqual("text_query", dmd.odm.calls[4][0])
self.assertEqual("local", x["driver_volume_type"])
+
+
+class DrbdManageCommonTestCase(DrbdManageIscsiTestCase):
+ def setUp(self):
+ super(DrbdManageCommonTestCase, self).setUp()
+
+ def test_drbd_policy_loop_timeout(self):
+ dmd = drv.DrbdManageDrbdDriver(configuration=self.configuration)
+ dmd.odm = DrbdManageFakeDriver()
+
+ res = dmd._call_policy_plugin('void', {},
+ {'retry': 4,
+ 'run-into-timeout': True})
+ self.assertFalse(res)
+ self.assertEqual(4, len(dmd.odm.calls))
+ self.assertEqual("run_external_plugin", dmd.odm.calls[0][0])
+ self.assertEqual("run_external_plugin", dmd.odm.calls[1][0])
+ self.assertEqual("run_external_plugin", dmd.odm.calls[2][0])
+ self.assertEqual("run_external_plugin", dmd.odm.calls[3][0])
+
+ def test_drbd_policy_loop_success(self):
+ dmd = drv.DrbdManageDrbdDriver(configuration=self.configuration)
+ dmd.odm = DrbdManageFakeDriver()
+
+ res = dmd._call_policy_plugin('void',
+ {'base': 'data',
+ 'retry': 4},
+ {'override': 'xyz'})
+ self.assertTrue(res)
+ self.assertEqual(4, len(dmd.odm.calls))
+ self.assertEqual("run_external_plugin", dmd.odm.calls[0][0])
+ self.assertEqual("run_external_plugin", dmd.odm.calls[1][0])
+ self.assertEqual("run_external_plugin", dmd.odm.calls[2][0])
+ self.assertEqual("run_external_plugin", dmd.odm.calls[3][0])
+
+ def test_drbd_policy_loop_simple(self):
+ dmd = drv.DrbdManageDrbdDriver(configuration=self.configuration)
+ dmd.odm = DrbdManageFakeDriver()
+
+ res = dmd._call_policy_plugin('policy-name',
+ {'base': "value",
+ 'over': "ignore"},
+ {'over': "ride",
+ 'starttime': 0})
+ self.assertTrue(res)
+
+ self.assertEqual(1, len(dmd.odm.calls))
+ self.assertEqual("run_external_plugin", dmd.odm.calls[0][0])
+ self.assertEqual('policy-name', dmd.odm.calls[0][1])
+
+ incoming = dmd.odm.calls[0][2]
+ self.assertGreaterEqual(4, abs(float(incoming['starttime']) -
+ time.time()))
+ self.assertEqual('value', incoming['base'])
+ self.assertEqual('ride', incoming['over'])
import eventlet
+import json
import six
import socket
import time
cfg.IntOpt('drbdmanage_redundancy',
default=1,
help='Number of nodes that should replicate the data.'),
+ cfg.StrOpt('drbdmanage_resource_policy',
+ default='{"ratio": "0.51", "timeout": "60"}',
+ help='Resource deployment completion wait policy.'),
+ cfg.StrOpt('drbdmanage_snapshot_policy',
+ default='{"count": "1", "timeout": "60"}',
+ help='Snapshot completion wait policy.'),
+ cfg.StrOpt('drbdmanage_resize_policy',
+ default='{"timeout": "60"}',
+ help='Volume resize completion wait policy.'),
+ cfg.StrOpt('drbdmanage_resource_plugin',
+ default="drbdmanage.plugins.plugins.wait_for.WaitForResource",
+ help='Resource deployment completion wait plugin.'),
+ cfg.StrOpt('drbdmanage_snapshot_plugin',
+ default="drbdmanage.plugins.plugins.wait_for.WaitForSnapshot",
+ help='Snapshot completion wait plugin.'),
+ cfg.StrOpt('drbdmanage_resize_plugin',
+ default="drbdmanage.plugins.plugins.wait_for.WaitForVolumeSize",
+ help='Volume resize completion wait plugin.'),
cfg.BoolOpt('drbdmanage_devs_on_controller',
default=True,
help='''If set, the c-vol node will receive a useable
self.backend_name = self.configuration.safe_get(
'volume_backend_name') or 'drbdmanage'
+ js_decoder = json.JSONDecoder()
+ self.policy_resource = js_decoder.decode(
+ self.configuration.safe_get('drbdmanage_resource_policy'))
+ self.policy_snapshot = js_decoder.decode(
+ self.configuration.safe_get('drbdmanage_snapshot_policy'))
+ self.policy_resize = js_decoder.decode(
+ self.configuration.safe_get('drbdmanage_resize_policy'))
+
+ self.plugin_resource = self.configuration.safe_get(
+ 'drbdmanage_resource_plugin')
+ self.plugin_snapshot = self.configuration.safe_get(
+ 'drbdmanage_snapshot_plugin')
+ self.plugin_resize = self.configuration.safe_get(
+ 'drbdmanage_resize_plugin')
+
# needed as per pep8:
# F841 local variable 'CS_DEPLOYED' is assigned to but never used
global CS_DEPLOYED, CS_DISKLESS, CS_UPD_CON
# Old function object is invalid, get new one.
return getattr(self.odm, fn._method_name)(*args)
+ def _fetch_answer_data(self, res, key, level=None, req=True):
+ for code, fmt, data in res:
+ if code == dm_exc.DM_INFO:
+ if level and level != fmt:
+ continue
+
+ value = [v for k, v in data if k == key]
+ if value:
+ if len(value) == 1:
+ return value[0]
+ else:
+ return value
+
+ if req:
+ if level:
+ l = level + ":" + key
+ else:
+ l = key
+
+ msg = _('DRBDmanage driver error: expected key "%s" '
+ 'not in answer, wrong DRBDmanage version?') % l
+ LOG.error(msg)
+ raise exception.VolumeDriverException(message=msg)
+
+ return None
+
def do_setup(self, context):
"""Any initialization the volume driver does while starting."""
super(DrbdManageBaseDriver, self).do_setup(context)
if code == dm_exc.DM_SUCCESS:
seen_success = True
continue
- if hasattr(dm_exc, 'DM_INFO'):
- if code == dm_exc.DM_INFO:
- continue
+ if code == dm_exc.DM_INFO:
+ continue
seen_error = _("Received error string: %s") % (fmt % arg)
if seen_error:
except ValueError:
return None
+ def _call_policy_plugin(self, plugin, pol_base, pol_this):
+ """Returns True for done, False for timeout."""
+
+ pol_inp_data = dict(pol_base)
+ pol_inp_data.update(pol_this,
+ starttime=str(time.time()))
+
+ retry = 0
+ while True:
+ res, pol_result = self.call_or_reconnect(
+ self.odm.run_external_plugin,
+ plugin,
+ pol_inp_data)
+ self._check_result(res)
+
+ if pol_result['result'] == dm_const.BOOL_TRUE:
+ return True
+
+ if pol_result['timeout'] == dm_const.BOOL_TRUE:
+ return False
+
+ eventlet.sleep(min(0.5 + retry / 5, 2))
+ retry += 1
+
def _wait_for_node_assignment(self, res_name, vol_nr, nodenames,
filter_props=None, timeout=90,
check_vol_deployed=True):
"""Return True as soon as one assignment matches the filter."""
+ # TODO(LINBIT): unify with policy plugins
+
if not filter_props:
filter_props = self.empty_dict
self._vol_size_to_dm(volume['size']),
props)
self._check_result(res)
+ drbd_vol = self._fetch_answer_data(res, dm_const.VOL_ID)
# If we crashed between create_volume and the deploy call,
# the volume might be defined but not exist on any server. Oh my.
0, True)
self._check_result(res)
- # TODO(pm): CG
- self._wait_for_node_assignment(d_res_name, 0, self.empty_list)
+ okay = self._call_policy_plugin(self.plugin_resource,
+ self.policy_resource,
+ dict(resource=d_res_name,
+ volnr=str(drbd_vol)))
+ if not okay:
+ message = (_('DRBDmanage timeout waiting for volume creation; '
+ 'resource "%(res)s", volume "%(vol)s"') %
+ {'res': d_res_name, 'vol': volume['id']})
+ raise exception.VolumeBackendAPIException(data=message)
if self.drbdmanage_devs_on_controller:
# TODO(pm): CG
res = self.call_or_reconnect(self.odm.assign,
socket.gethostname(),
d_res_name,
- self.empty_dict)
+ [(dm_const.FLAG_DISKLESS,
+ dm_const.BOOL_TRUE)])
self._check_result(res, ignore=[dm_exc.DM_EEXIST])
return {}
d_res_name, d_vol_nr, False)
self._check_result(res, ignore=[dm_exc.DM_ENOENT])
+ # Ask for volumes in that resource that are not scheduled for deletion.
res, rl = self.call_or_reconnect(self.odm.list_volumes,
[d_res_name],
0,
- self.empty_dict,
+ [(dm_const.TSTATE_PREFIX +
+ dm_const.FLAG_REMOVE,
+ dm_const.BOOL_FALSE)],
self.empty_list)
self._check_result(res)
r_props = self.empty_dict
# TODO(PM): consistency groups => different volume number possible
- v_props = [(0, self._priv_hash_from_volume(volume))]
+ new_vol_nr = 0
+ v_props = [(new_vol_nr, self._priv_hash_from_volume(volume))]
res = self.call_or_reconnect(self.odm.restore_snapshot,
new_res,
sname,
r_props,
v_props)
- return self._check_result(res, ignore=[dm_exc.DM_ENOENT])
+ self._check_result(res, ignore=[dm_exc.DM_ENOENT])
+
+ # TODO(PM): CG
+ okay = self._call_policy_plugin(self.plugin_resource,
+ self.policy_resource,
+ dict(resource=new_res,
+ volnr=str(new_vol_nr)))
+ if not okay:
+ message = (_('DRBDmanage timeout waiting for new volume '
+ 'after snapshot restore; '
+ 'resource "%(res)s", volume "%(vol)s"') %
+ {'res': new_res, 'vol': volume['id']})
+ raise exception.VolumeBackendAPIException(data=message)
def create_cloned_volume(self, volume, src_vref):
temp_id = self._clean_uuid()
self._vol_size_to_dm(new_size),
0)
self._check_result(res)
- return 0
+
+ okay = self._call_policy_plugin(self.plugin_resize,
+ self.policy_resize,
+ dict(resource=d_res_name,
+ volnr=str(d_vol_nr),
+ req_size=str(new_size)))
+ if not okay:
+ message = (_('DRBDmanage timeout waiting for volume size; '
+ 'volume ID "%(id)s" (res "%(res)s", vnr %(vnr)d)') %
+ {'id': volume['id'],
+ 'res': d_res_name, 'vnr': d_vol_nr})
+ raise exception.VolumeBackendAPIException(data=message)
def create_snapshot(self, snapshot):
"""Creates a snapshot."""
self.empty_dict,
[d_res_name],
0,
- self.empty_dict,
+ {CS_DISKLESS: dm_const.BOOL_FALSE},
self.empty_list)
self._check_result(res)
d_res_name, sn_name, nodes, props)
self._check_result(res)
+ okay = self._call_policy_plugin(self.plugin_snapshot,
+ self.policy_snapshot,
+ dict(resource=d_res_name,
+ snapshot=sn_name))
+ if not okay:
+ message = (_('DRBDmanage timeout waiting for snapshot creation; '
+ 'resource "%(res)s", snapshot "%(sn)s"') %
+ {'res': d_res_name, 'sn': sn_name})
+ raise exception.VolumeBackendAPIException(data=message)
+
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""