From: Ihar Hrachyshka Date: Tue, 14 Oct 2014 13:05:20 +0000 (+0200) Subject: Updated cache module and its dependencies X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=d739790ba8dcdf3180e8d6d09423937aaf802a7c;p=openstack-build%2Fneutron-build.git Updated cache module and its dependencies This is to avoid cache module dependency on timeutils that are now moved to oslo.utils. The following changes are included: * neutron/openstack/common/cache/_backends/memory.py 6ff6b4b Switch oslo-incubator to use oslo.utils and remove old modules 2bedce3 Fix MemoryBackend not purging item from _keys_expired on delete * neutron/openstack/common/cache/backends.py 39625e1 Set pbr 'warnerrors' option for doc build * neutron/openstack/common/cache/cache.py 9c683be fix small typo * neutron/openstack/common/lockutils.py 5d40e14 Remove code that moved to oslo.i18n 7209975 Always log the releasing, even under failure bbb266c Clarify logging in lockutils 942e1aa Use file locks by default again ac995be Fix E126 pep8 errors 15b8352 Remove oslo.log from lockutils Change-Id: I02cb4b2bc4b7bcba948e67cffdb8bd0219c89a29 --- diff --git a/neutron/openstack/common/cache/_backends/memory.py b/neutron/openstack/common/cache/_backends/memory.py index d6f5249fe..5c02cfc40 100644 --- a/neutron/openstack/common/cache/_backends/memory.py +++ b/neutron/openstack/common/cache/_backends/memory.py @@ -14,9 +14,10 @@ import collections +from oslo.utils import timeutils + from neutron.openstack.common.cache import backends from neutron.openstack.common import lockutils -from neutron.openstack.common import timeutils class MemoryBackend(backends.BaseCache): @@ -147,7 +148,7 @@ class MemoryBackend(backends.BaseCache): try: # NOTE(flaper87): Keys with ttl == 0 # don't exist in the _keys_expires dict - self._keys_expires[value[0]].remove(value[1]) + self._keys_expires[value[0]].remove(key) except (KeyError, ValueError): pass diff --git a/neutron/openstack/common/cache/backends.py b/neutron/openstack/common/cache/backends.py index 2fa4aaeb2..1bea8912a 100644 --- a/neutron/openstack/common/cache/backends.py +++ b/neutron/openstack/common/cache/backends.py @@ -26,9 +26,9 @@ class BaseCache(object): :params parsed_url: Parsed url object. :params options: A dictionary with configuration parameters - for the cache. For example: - - default_ttl: An integer defining the default ttl - for keys. + for the cache. For example: + + - default_ttl: An integer defining the default ttl for keys. """ def __init__(self, parsed_url, options=None): @@ -43,20 +43,17 @@ class BaseCache(object): def set(self, key, value, ttl, not_exists=False): """Sets or updates a cache entry - NOTE: Thread-safety is required and has to be - guaranteed by the backend implementation. + .. note:: Thread-safety is required and has to be guaranteed by the + backend implementation. :params key: Item key as string. :type key: `unicode string` - :params value: Value to assign to the key. This - can be anything that is handled - by the current backend. - :params ttl: Key's timeout in seconds. 0 means - no timeout. + :params value: Value to assign to the key. This can be anything that + is handled by the current backend. + :params ttl: Key's timeout in seconds. 0 means no timeout. :type ttl: int - :params not_exists: If True, the key will be set - if it doesn't exist. Otherwise, - it'll always be set. + :params not_exists: If True, the key will be set if it doesn't exist. + Otherwise, it'll always be set. :type not_exists: bool :returns: True if the operation succeeds, False otherwise. @@ -74,9 +71,8 @@ class BaseCache(object): :params key: Item key as string. :type key: `unicode string` - :params value: Value to assign to the key. This - can be anything that is handled - by the current backend. + :params value: Value to assign to the key. This can be anything that + is handled by the current backend. """ try: return self[key] @@ -91,15 +87,14 @@ class BaseCache(object): def get(self, key, default=None): """Gets one item from the cache - NOTE: Thread-safety is required and it has to be - guaranteed by the backend implementation. + .. note:: Thread-safety is required and it has to be guaranteed + by the backend implementation. - :params key: Key for the item to retrieve - from the cache. + :params key: Key for the item to retrieve from the cache. :params default: The default value to return. - :returns: `key`'s value in the cache if it exists, - otherwise `default` should be returned. + :returns: `key`'s value in the cache if it exists, otherwise + `default` should be returned. """ return self._get(key, default) @@ -115,8 +110,8 @@ class BaseCache(object): def __delitem__(self, key): """Removes an item from cache. - NOTE: Thread-safety is required and it has to be - guaranteed by the backend implementation. + .. note:: Thread-safety is required and it has to be guaranteed by + the backend implementation. :params key: The key to remove. @@ -130,8 +125,8 @@ class BaseCache(object): def clear(self): """Removes all items from the cache. - NOTE: Thread-safety is required and it has to be - guaranteed by the backend implementation. + .. note:: Thread-safety is required and it has to be guaranteed by + the backend implementation. """ return self._clear() @@ -143,9 +138,8 @@ class BaseCache(object): """Increments the value for a key :params key: The key for the value to be incremented - :params delta: Number of units by which to increment - the value. Pass a negative number to - decrement the value. + :params delta: Number of units by which to increment the value. + Pass a negative number to decrement the value. :returns: The new value """ @@ -158,10 +152,8 @@ class BaseCache(object): def append_tail(self, key, tail): """Appends `tail` to `key`'s value. - :params key: The key of the value to which - `tail` should be appended. - :params tail: The list of values to append to the - original. + :params key: The key of the value to which `tail` should be appended. + :params tail: The list of values to append to the original. :returns: The new value """ @@ -181,10 +173,8 @@ class BaseCache(object): def append(self, key, value): """Appends `value` to `key`'s value. - :params key: The key of the value to which - `tail` should be appended. - :params value: The value to append to the - original. + :params key: The key of the value to which `tail` should be appended. + :params value: The value to append to the original. :returns: The new value """ @@ -196,8 +186,7 @@ class BaseCache(object): :params key: The key to verify. - :returns: True if the key exists, - otherwise False. + :returns: True if the key exists, otherwise False. """ @abc.abstractmethod @@ -209,9 +198,8 @@ class BaseCache(object): """Gets keys' value from cache :params keys: List of keys to retrieve. - :params default: The default value to return - for each key that is not in - the cache. + :params default: The default value to return for each key that is not + in the cache. :returns: A generator of (key, value) """ @@ -227,13 +215,12 @@ class BaseCache(object): def set_many(self, data, ttl=None): """Puts several items into the cache at once - Depending on the backend, this operation may or may - not be efficient. The default implementation calls - set for each (key, value) pair passed, other backends - support set_many operations as part of their protocols. + Depending on the backend, this operation may or may not be efficient. + The default implementation calls set for each (key, value) pair + passed, other backends support set_many operations as part of their + protocols. - :params data: A dictionary like {key: val} to store - in the cache. + :params data: A dictionary like {key: val} to store in the cache. :params ttl: Key's timeout in seconds. """ diff --git a/neutron/openstack/common/cache/cache.py b/neutron/openstack/common/cache/cache.py index 1247787a2..70c4545de 100644 --- a/neutron/openstack/common/cache/cache.py +++ b/neutron/openstack/common/cache/cache.py @@ -24,7 +24,7 @@ from six.moves.urllib import parse from stevedore import driver -def _get_olso_configs(): +def _get_oslo_configs(): """Returns the oslo.config options to register.""" # NOTE(flaper87): Oslo config should be # optional. Instead of doing try / except @@ -45,7 +45,7 @@ def register_oslo_configs(conf): :params conf: Config object. :type conf: `cfg.ConfigOptions` """ - conf.register_opts(_get_olso_configs()) + conf.register_opts(_get_oslo_configs()) def get_cache(url='memory://'): diff --git a/neutron/openstack/common/lockutils.py b/neutron/openstack/common/lockutils.py index f0c5cb13c..996a810bb 100644 --- a/neutron/openstack/common/lockutils.py +++ b/neutron/openstack/common/lockutils.py @@ -13,10 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. - import contextlib import errno import functools +import logging import os import shutil import subprocess @@ -29,9 +29,7 @@ import weakref from oslo.config import cfg from neutron.openstack.common import fileutils -from neutron.openstack.common.gettextutils import _ -from neutron.openstack.common import local -from neutron.openstack.common import log as logging +from neutron.openstack.common._i18n import _, _LE, _LI LOG = logging.getLogger(__name__) @@ -39,10 +37,10 @@ LOG = logging.getLogger(__name__) util_opts = [ cfg.BoolOpt('disable_process_locking', default=False, - help='Whether to disable inter-process locks'), + help='Enables or disables inter-process locks.'), cfg.StrOpt('lock_path', default=os.environ.get("NEUTRON_LOCK_PATH"), - help=('Directory to use for lock files.')) + help='Directory to use for lock files.') ] @@ -54,7 +52,7 @@ def set_defaults(lock_path): cfg.set_defaults(util_opts, lock_path=lock_path) -class _InterProcessLock(object): +class _FileLock(object): """Lock implementation which allows multiple locks, working around issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does not require any cleanup. Since the lock is always held on a file @@ -76,7 +74,13 @@ class _InterProcessLock(object): self.lockfile = None self.fname = name - def __enter__(self): + def acquire(self): + basedir = os.path.dirname(self.fname) + + if not os.path.exists(basedir): + fileutils.ensure_tree(basedir) + LOG.info(_LI('Created lock path: %s'), basedir) + self.lockfile = open(self.fname, 'w') while True: @@ -86,23 +90,39 @@ class _InterProcessLock(object): # Also upon reading the MSDN docs for locking(), it seems # to have a laughable 10 attempts "blocking" mechanism. self.trylock() - return self + LOG.debug('Got file lock "%s"', self.fname) + return True except IOError as e: if e.errno in (errno.EACCES, errno.EAGAIN): # external locks synchronise things like iptables # updates - give it some time to prevent busy spinning time.sleep(0.01) else: - raise + raise threading.ThreadError(_("Unable to acquire lock on" + " `%(filename)s` due to" + " %(exception)s") % + {'filename': self.fname, + 'exception': e}) - def __exit__(self, exc_type, exc_val, exc_tb): + def __enter__(self): + self.acquire() + return self + + def release(self): try: self.unlock() self.lockfile.close() + LOG.debug('Released file lock "%s"', self.fname) except IOError: - LOG.exception(_("Could not release the acquired lock `%s`"), + LOG.exception(_LE("Could not release the acquired lock `%s`"), self.fname) + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() + + def exists(self): + return os.path.exists(self.fname) + def trylock(self): raise NotImplementedError() @@ -110,7 +130,7 @@ class _InterProcessLock(object): raise NotImplementedError() -class _WindowsLock(_InterProcessLock): +class _WindowsLock(_FileLock): def trylock(self): msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) @@ -118,7 +138,7 @@ class _WindowsLock(_InterProcessLock): msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) -class _PosixLock(_InterProcessLock): +class _FcntlLock(_FileLock): def trylock(self): fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) @@ -131,12 +151,63 @@ if os.name == 'nt': InterProcessLock = _WindowsLock else: import fcntl - InterProcessLock = _PosixLock + InterProcessLock = _FcntlLock _semaphores = weakref.WeakValueDictionary() _semaphores_lock = threading.Lock() +def _get_lock_path(name, lock_file_prefix, lock_path=None): + # NOTE(mikal): the lock name cannot contain directory + # separators + name = name.replace(os.sep, '_') + if lock_file_prefix: + sep = '' if lock_file_prefix.endswith('-') else '-' + name = '%s%s%s' % (lock_file_prefix, sep, name) + + local_lock_path = lock_path or CONF.lock_path + + if not local_lock_path: + raise cfg.RequiredOptError('lock_path') + + return os.path.join(local_lock_path, name) + + +def external_lock(name, lock_file_prefix=None, lock_path=None): + LOG.debug('Attempting to grab external lock "%(lock)s"', + {'lock': name}) + + lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) + + return InterProcessLock(lock_file_path) + + +def remove_external_lock_file(name, lock_file_prefix=None): + """Remove an external lock file when it's not used anymore + This will be helpful when we have a lot of lock files + """ + with internal_lock(name): + lock_file_path = _get_lock_path(name, lock_file_prefix) + try: + os.remove(lock_file_path) + except OSError: + LOG.info(_LI('Failed to remove file %(file)s'), + {'file': lock_file_path}) + + +def internal_lock(name): + with _semaphores_lock: + try: + sem = _semaphores[name] + LOG.debug('Using existing semaphore "%s"', name) + except KeyError: + sem = threading.Semaphore() + _semaphores[name] = sem + LOG.debug('Created new semaphore "%s"', name) + + return sem + + @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None): """Context based lock @@ -152,67 +223,19 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None): should work across multiple processes. This means that if two different workers both run a method decorated with @synchronized('mylock', external=True), only one of them will execute at a time. - - :param lock_path: The lock_path keyword argument is used to specify a - special location for external lock files to live. If nothing is set, then - CONF.lock_path is used as a default. """ - with _semaphores_lock: - try: - sem = _semaphores[name] - except KeyError: - sem = threading.Semaphore() - _semaphores[name] = sem - - with sem: - LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name}) - - # NOTE(mikal): I know this looks odd - if not hasattr(local.strong_store, 'locks_held'): - local.strong_store.locks_held = [] - local.strong_store.locks_held.append(name) - + int_lock = internal_lock(name) + with int_lock: + LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name}) try: if external and not CONF.disable_process_locking: - LOG.debug(_('Attempting to grab file lock "%(lock)s"'), - {'lock': name}) - - # We need a copy of lock_path because it is non-local - local_lock_path = lock_path or CONF.lock_path - if not local_lock_path: - raise cfg.RequiredOptError('lock_path') - - if not os.path.exists(local_lock_path): - fileutils.ensure_tree(local_lock_path) - LOG.info(_('Created lock path: %s'), local_lock_path) - - def add_prefix(name, prefix): - if not prefix: - return name - sep = '' if prefix.endswith('-') else '-' - return '%s%s%s' % (prefix, sep, name) - - # NOTE(mikal): the lock name cannot contain directory - # separators - lock_file_name = add_prefix(name.replace(os.sep, '_'), - lock_file_prefix) - - lock_file_path = os.path.join(local_lock_path, lock_file_name) - - try: - lock = InterProcessLock(lock_file_path) - with lock as lock: - LOG.debug(_('Got file lock "%(lock)s" at %(path)s'), - {'lock': name, 'path': lock_file_path}) - yield lock - finally: - LOG.debug(_('Released file lock "%(lock)s" at %(path)s'), - {'lock': name, 'path': lock_file_path}) + ext_lock = external_lock(name, lock_file_prefix, lock_path) + with ext_lock: + yield ext_lock else: - yield sem - + yield int_lock finally: - local.strong_store.locks_held.remove(name) + LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name}) def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): @@ -244,11 +267,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): def inner(*args, **kwargs): try: with lock(name, lock_file_prefix, external, lock_path): - LOG.debug(_('Got semaphore / lock "%(function)s"'), + LOG.debug('Got semaphore / lock "%(function)s"', {'function': f.__name__}) return f(*args, **kwargs) finally: - LOG.debug(_('Semaphore / lock released "%(function)s"'), + LOG.debug('Semaphore / lock released "%(function)s"', {'function': f.__name__}) return inner return wrap