]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Tooz locks
authorSzymon Wróblewski <szymon.wroblewski@intel.com>
Fri, 15 May 2015 14:13:32 +0000 (16:13 +0200)
committerSzymon Wróblewski <szymon.wroblewski@intel.com>
Thu, 17 Dec 2015 10:27:43 +0000 (10:27 +0000)
This change adds support for distributed locks, based on tooz as a part
of work required to achieve A/A HA in c-vol service.

Co-Authored-By: Gorka Eguileor <geguileo@redhat.com>
Co-Authored-By: Michal Dulko <michal.dulko@intel.com>
Implements: blueprint cinder-volume-active-active-support
Depends-On: I86fa4340f850270b197919896b7f8639c214ceed
Change-Id: I52b8d0a05a3dbedc67f3725f9ba6d009b8d1858f

cinder/coordination.py [new file with mode: 0644]
cinder/exception.py
cinder/opts.py
cinder/tests/unit/test_coordination.py [new file with mode: 0644]
releasenotes/notes/tooz-locks-0f9f2cc15f8dad5a.yaml [new file with mode: 0644]
requirements.txt

diff --git a/cinder/coordination.py b/cinder/coordination.py
new file mode 100644 (file)
index 0000000..a4797c0
--- /dev/null
@@ -0,0 +1,286 @@
+# Copyright 2015 Intel
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Coordination and locking utilities."""
+
+import inspect
+import random
+import threading
+import uuid
+
+import eventlet
+from eventlet import tpool
+import itertools
+from oslo_config import cfg
+from oslo_log import log
+import six
+from tooz import coordination
+from tooz import locking
+
+from cinder import exception
+from cinder.i18n import _, _LE, _LI, _LW
+
+LOG = log.getLogger(__name__)
+
+coordination_opts = [
+    cfg.StrOpt('backend_url',
+               default='file://$state_path',
+               help='The backend URL to use for distributed coordination.'),
+    cfg.FloatOpt('heartbeat',
+                 default=1.0,
+                 help='Number of seconds between heartbeats for distributed '
+                      'coordination.'),
+    cfg.FloatOpt('initial_reconnect_backoff',
+                 default=0.1,
+                 help='Initial number of seconds to wait after failed '
+                      'reconnection.'),
+    cfg.FloatOpt('max_reconnect_backoff',
+                 default=60.0,
+                 help='Maximum number of seconds between sequential '
+                      'reconnection retries.'),
+
+]
+
+CONF = cfg.CONF
+CONF.register_opts(coordination_opts, group='coordination')
+
+
+class Coordinator(object):
+    """Tooz coordination wrapper.
+
+    Coordination member id is created from concatenated
+    `prefix` and `agent_id` parameters.
+
+    :param str agent_id: Agent identifier
+    :param str prefix: Used to provide member identifier with a
+        meaningful prefix.
+    """
+
+    def __init__(self, agent_id=None, prefix=''):
+        self.coordinator = None
+        self.agent_id = agent_id or str(uuid.uuid4())
+        self.started = False
+        self.prefix = prefix
+        self._ev = None
+        self._dead = None
+
+    def is_active(self):
+        return self.coordinator is not None
+
+    def start(self):
+        """Connect to coordination backend and start heartbeat."""
+        if not self.started:
+            try:
+                self._dead = threading.Event()
+                self._start()
+                self.started = True
+                # NOTE(bluex): Start heartbeat in separate thread to avoid
+                # being blocked by long coroutines.
+                if self.coordinator and self.coordinator.requires_beating:
+                    self._ev = eventlet.spawn(
+                        lambda: tpool.execute(self.heartbeat))
+            except coordination.ToozError:
+                LOG.exception(_LE('Error starting coordination backend.'))
+                raise
+            LOG.info(_LI('Coordination backend started successfully.'))
+
+    def stop(self):
+        """Disconnect from coordination backend and stop heartbeat."""
+        if self.started:
+            self.coordinator.stop()
+            self._dead.set()
+            if self._ev is not None:
+                self._ev.wait()
+            self._ev = None
+            self.coordinator = None
+            self.started = False
+
+    def get_lock(self, name):
+        """Return a Tooz backend lock.
+
+        :param str name: The lock name that is used to identify it
+            across all nodes.
+        """
+        if self.coordinator is not None:
+            return self.coordinator.get_lock(self.prefix + name)
+        else:
+            raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
+
+    def heartbeat(self):
+        """Coordinator heartbeat.
+
+        Method that every couple of seconds (config: `coordination.heartbeat`)
+        sends heartbeat to prove that the member is not dead.
+
+        If connection to coordination backend is broken it tries to
+        reconnect every couple of seconds
+        (config: `coordination.initial_reconnect_backoff` up to
+        `coordination.max_reconnect_backoff`)
+
+        """
+        while self.coordinator is not None and not self._dead.is_set():
+            try:
+                self._heartbeat()
+            except coordination.ToozConnectionError:
+                self._reconnect()
+            else:
+                self._dead.wait(cfg.CONF.coordination.heartbeat)
+
+    def _start(self):
+        member_id = self.prefix + self.agent_id
+        self.coordinator = coordination.get_coordinator(
+            cfg.CONF.coordination.backend_url, member_id)
+        self.coordinator.start()
+
+    def _heartbeat(self):
+        try:
+            self.coordinator.heartbeat()
+            return True
+        except coordination.ToozConnectionError:
+            LOG.exception(_LE('Connection error while sending a heartbeat '
+                              'to coordination backend.'))
+            raise
+        except coordination.ToozError:
+            LOG.exception(_LE('Error sending a heartbeat to coordination '
+                              'backend.'))
+        return False
+
+    def _reconnect(self):
+        """Reconnect with jittered exponential backoff increase."""
+        LOG.info(_LI('Reconnecting to coordination backend.'))
+        cap = cfg.CONF.coordination.max_reconnect_backoff
+        backoff = base = cfg.CONF.coordination.initial_reconnect_backoff
+        for attempt in itertools.count(1):
+            try:
+                self._start()
+                break
+            except coordination.ToozError:
+                backoff = min(cap, random.uniform(base, backoff * 3))
+                msg = _LW('Reconnect attempt %(attempt)s failed. '
+                          'Next try in %(backoff).2fs.')
+                LOG.warning(msg, {'attempt': attempt, 'backoff': backoff})
+                self._dead.wait(backoff)
+        LOG.info(_LI('Reconnected to coordination backend.'))
+
+
+COORDINATOR = Coordinator(prefix='cinder-')
+
+
+class Lock(locking.Lock):
+    """Lock with dynamic name.
+
+    :param str lock_name: Lock name.
+    :param dict lock_data: Data for lock name formatting.
+    :param coordinator: Coordinator class to use when creating lock.
+        Defaults to the global coordinator.
+
+    Using it like so::
+
+        with Lock('mylock'):
+           ...
+
+    ensures that only one process at a time will execute code in context.
+    Lock name can be formatted using Python format string syntax::
+
+        Lock('foo-{volume.id}, {'volume': ...,})
+
+    Available field names are keys of lock_data.
+    """
+    def __init__(self, lock_name, lock_data=None, coordinator=None):
+        super(Lock, self).__init__(str(id(self)))
+        lock_data = lock_data or {}
+        self.coordinator = coordinator or COORDINATOR
+        self.blocking = True
+        self.lock = self._prepare_lock(lock_name, lock_data)
+
+    def _prepare_lock(self, lock_name, lock_data):
+        if not isinstance(lock_name, six.string_types):
+            raise ValueError(_('Not a valid string: %s') % lock_name)
+        return self.coordinator.get_lock(lock_name.format(**lock_data))
+
+    def acquire(self, blocking=None):
+        """Attempts to acquire lock.
+
+        :param blocking: If True, blocks until the lock is acquired. If False,
+            returns right away. Otherwise, the value is used as a timeout
+            value and the call returns maximum after this number of seconds.
+        :return: returns true if acquired (false if not)
+        :rtype: bool
+        """
+        blocking = self.blocking if blocking is None else blocking
+        return self.lock.acquire(blocking=blocking)
+
+    def release(self):
+        """Attempts to release lock.
+
+        The behavior of releasing a lock which was not acquired in the first
+        place is undefined.
+        :return: returns true if released (false if not)
+        :rtype: bool
+        """
+        self.lock.release()
+
+
+def synchronized(lock_name, blocking=True, coordinator=None):
+    """Synchronization decorator.
+
+    :param str lock_name: Lock name.
+    :param blocking: If True, blocks until the lock is acquired.
+            If False, raises exception when not acquired. Otherwise,
+            the value is used as a timeout value and if lock is not acquired
+            after this number of seconds exception is raised.
+    :param coordinator: Coordinator class to use when creating lock.
+        Defaults to the global coordinator.
+    :raises tooz.coordination.LockAcquireFailed: if lock is not acquired
+
+    Decorating a method like so::
+
+        @synchronized('mylock')
+        def foo(self, *args):
+           ...
+
+    ensures that only one process will execute the foo method at a time.
+
+    Different methods can share the same lock::
+
+        @synchronized('mylock')
+        def foo(self, *args):
+           ...
+
+        @synchronized('mylock')
+        def bar(self, *args):
+           ...
+
+    This way only one of either foo or bar can be executing at a time.
+
+    Lock name can be formatted using Python format string syntax::
+
+        @synchronized('{f_name}-{vol.id}-{snap[name]}')
+        def foo(self, vol, snap):
+           ...
+
+    Available field names are: decorated function parameters and
+    `f_name` as a decorated function name.
+    """
+    def wrap(f):
+        @six.wraps(f)
+        def wrapped(*a, **k):
+            call_args = inspect.getcallargs(f, *a, **k)
+            call_args['f_name'] = f.__name__
+            lock = Lock(lock_name, call_args, coordinator)
+            with lock(blocking):
+                return f(*a, **k)
+        return wrapped
+    return wrap
index b534453759102b64feaf1fa5e43f9cca8e1e2c98..b70661ba687562d7f363155b7b6f445b7eb28ecb 100644 (file)
@@ -702,6 +702,14 @@ class EvaluatorParseException(Exception):
     message = _("Error during evaluator parsing: %(reason)s")
 
 
+class LockCreationFailed(CinderException):
+    message = _('Unable to create lock. Coordination backend not started.')
+
+
+class LockingFailed(CinderException):
+    message = _('Lock acquisition failed.')
+
+
 UnsupportedObjectError = obj_exc.UnsupportedObjectError
 OrphanedObjectError = obj_exc.OrphanedObjectError
 IncompatibleObjectVersion = obj_exc.IncompatibleObjectVersion
index 314ebd1d2fcf5ba035ec9aca04e118bdbe15c951..a5760434ffcbc72066e7945b3ef02906ec69eaaf 100644 (file)
@@ -34,6 +34,7 @@ from cinder.common import config as cinder_common_config
 import cinder.compute
 from cinder.compute import nova as cinder_compute_nova
 from cinder import context as cinder_context
+from cinder import coordination as cinder_coordination
 from cinder.db import api as cinder_db_api
 from cinder.db import base as cinder_db_base
 from cinder import exception as cinder_exception
@@ -332,6 +333,10 @@ def list_opts():
                 cinder_zonemanager_drivers_brocade_brcdfabricopts.
                 brcd_zone_opts,
             )),
+        ('COORDINATION',
+            itertools.chain(
+                cinder_coordination.coordination_opts,
+            )),
         ('BACKEND',
             itertools.chain(
                 [cinder_cmd_volume.host_opt],
diff --git a/cinder/tests/unit/test_coordination.py b/cinder/tests/unit/test_coordination.py
new file mode 100644 (file)
index 0000000..7d31e51
--- /dev/null
@@ -0,0 +1,131 @@
+# Copyright 2015 Intel
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+import tooz.coordination
+import tooz.locking
+
+from cinder import coordination
+from cinder import test
+
+
+class Locked(Exception):
+    pass
+
+
+class MockToozLock(tooz.locking.Lock):
+    active_locks = set()
+
+    def acquire(self, blocking=True):
+        if self.name not in self.active_locks:
+            self.active_locks.add(self.name)
+            return True
+        elif not blocking:
+            return False
+        else:
+            raise Locked
+
+    def release(self):
+        self.active_locks.remove(self.name)
+
+
+@mock.patch('time.sleep', lambda _: None)
+@mock.patch('eventlet.spawn', lambda f: f())
+@mock.patch('eventlet.tpool.execute', lambda f: f())
+@mock.patch.object(coordination.Coordinator, 'heartbeat')
+@mock.patch('tooz.coordination.get_coordinator')
+class CoordinatorTestCase(test.TestCase):
+    def test_coordinator_start(self, get_coordinator, heartbeat):
+        crd = get_coordinator.return_value
+
+        agent = coordination.Coordinator()
+        agent.start()
+        self.assertTrue(get_coordinator.called)
+        self.assertTrue(heartbeat.called)
+        self.assertTrue(crd.start.called)
+
+    def test_coordinator_stop(self, get_coordinator, heartbeat):
+        crd = get_coordinator.return_value
+
+        agent = coordination.Coordinator()
+        agent.start()
+        self.assertIsNotNone(agent.coordinator)
+        agent.stop()
+        self.assertTrue(crd.stop.called)
+        self.assertIsNone(agent.coordinator)
+
+    def test_coordinator_lock(self, get_coordinator, heartbeat):
+        crd = get_coordinator.return_value
+        crd.get_lock.side_effect = lambda n: MockToozLock(n)
+
+        agent1 = coordination.Coordinator()
+        agent1.start()
+        agent2 = coordination.Coordinator()
+        agent2.start()
+        self.assertNotIn('lock', MockToozLock.active_locks)
+        with agent1.get_lock('lock'):
+            self.assertIn('lock', MockToozLock.active_locks)
+            self.assertRaises(Locked, agent1.get_lock('lock').acquire)
+            self.assertRaises(Locked, agent2.get_lock('lock').acquire)
+        self.assertNotIn('lock', MockToozLock.active_locks)
+
+    def test_coordinator_offline(self, get_coordinator, heartbeat):
+        crd = get_coordinator.return_value
+        crd.start.side_effect = tooz.coordination.ToozConnectionError('err')
+
+        agent = coordination.Coordinator()
+        self.assertRaises(tooz.coordination.ToozError, agent.start)
+        self.assertFalse(agent.started)
+        self.assertFalse(heartbeat.called)
+
+    def test_coordinator_reconnect(self, get_coordinator, heartbeat):
+        start_online = iter([True] + [False] * 5 + [True])
+        heartbeat_online = iter((False, True, True))
+
+        def raiser(cond):
+            if not cond:
+                raise tooz.coordination.ToozConnectionError('err')
+
+        crd = get_coordinator.return_value
+        crd.start.side_effect = lambda *_: raiser(next(start_online))
+        crd.heartbeat.side_effect = lambda *_: raiser(next(heartbeat_online))
+
+        agent = coordination.Coordinator()
+        agent.start()
+        self.assertRaises(tooz.coordination.ToozConnectionError,
+                          agent._heartbeat)
+        self.assertEqual(1, get_coordinator.call_count)
+        agent._reconnect()
+        self.assertEqual(7, get_coordinator.call_count)
+        agent._heartbeat()
+
+
+@mock.patch.object(coordination.COORDINATOR, 'get_lock')
+class CoordinationTestCase(test.TestCase):
+    def test_lock(self, get_lock):
+        with coordination.Lock('lock'):
+            self.assertTrue(get_lock.called)
+
+    def test_synchronized(self, get_lock):
+        @coordination.synchronized('lock-{f_name}-{foo.val}-{bar[val]}')
+        def func(foo, bar):
+            pass
+
+        foo = mock.Mock()
+        foo.val = 7
+        bar = mock.MagicMock()
+        bar.__getitem__.return_value = 8
+        func(foo, bar)
+        get_lock.assert_called_with('lock-func-7-8')
diff --git a/releasenotes/notes/tooz-locks-0f9f2cc15f8dad5a.yaml b/releasenotes/notes/tooz-locks-0f9f2cc15f8dad5a.yaml
new file mode 100644 (file)
index 0000000..1fb77dc
--- /dev/null
@@ -0,0 +1,4 @@
+---
+features:
+  - Locks may use Tooz as abstraction layer now, to support distributed lock
+    managers and prepare Cinder to better support HA configurations.
index c5710f19d324a4b486f7ca03849cc222075d5c86..3f249476c31e3cc4f1e08173c8fbe913694c6e26 100644 (file)
@@ -52,3 +52,4 @@ oslo.i18n>=1.5.0 # Apache-2.0
 oslo.vmware>=1.16.0 # Apache-2.0
 os-brick>=0.4.0 # Apache-2.0
 os-win>=0.0.7 # Apache-2.0
+tooz>=1.28.0 # Apache-2.0