:params parsed_url: Parsed url object.
:params options: A dictionary with configuration parameters
- for the cache. For example:
- - default_ttl: An integer defining the default ttl
- for keys.
+ for the cache. For example:
+
+ - default_ttl: An integer defining the default ttl for keys.
"""
def __init__(self, parsed_url, options=None):
def set(self, key, value, ttl, not_exists=False):
"""Sets or updates a cache entry
- NOTE: Thread-safety is required and has to be
- guaranteed by the backend implementation.
+ .. note:: Thread-safety is required and has to be guaranteed by the
+ backend implementation.
:params key: Item key as string.
:type key: `unicode string`
- :params value: Value to assign to the key. This
- can be anything that is handled
- by the current backend.
- :params ttl: Key's timeout in seconds. 0 means
- no timeout.
+ :params value: Value to assign to the key. This can be anything that
+ is handled by the current backend.
+ :params ttl: Key's timeout in seconds. 0 means no timeout.
:type ttl: int
- :params not_exists: If True, the key will be set
- if it doesn't exist. Otherwise,
- it'll always be set.
+ :params not_exists: If True, the key will be set if it doesn't exist.
+ Otherwise, it'll always be set.
:type not_exists: bool
:returns: True if the operation succeeds, False otherwise.
:params key: Item key as string.
:type key: `unicode string`
- :params value: Value to assign to the key. This
- can be anything that is handled
- by the current backend.
+ :params value: Value to assign to the key. This can be anything that
+ is handled by the current backend.
"""
try:
return self[key]
def get(self, key, default=None):
"""Gets one item from the cache
- NOTE: Thread-safety is required and it has to be
- guaranteed by the backend implementation.
+ .. note:: Thread-safety is required and it has to be guaranteed
+ by the backend implementation.
- :params key: Key for the item to retrieve
- from the cache.
+ :params key: Key for the item to retrieve from the cache.
:params default: The default value to return.
- :returns: `key`'s value in the cache if it exists,
- otherwise `default` should be returned.
+ :returns: `key`'s value in the cache if it exists, otherwise
+ `default` should be returned.
"""
return self._get(key, default)
def __delitem__(self, key):
"""Removes an item from cache.
- NOTE: Thread-safety is required and it has to be
- guaranteed by the backend implementation.
+ .. note:: Thread-safety is required and it has to be guaranteed by
+ the backend implementation.
:params key: The key to remove.
def clear(self):
"""Removes all items from the cache.
- NOTE: Thread-safety is required and it has to be
- guaranteed by the backend implementation.
+ .. note:: Thread-safety is required and it has to be guaranteed by
+ the backend implementation.
"""
return self._clear()
"""Increments the value for a key
:params key: The key for the value to be incremented
- :params delta: Number of units by which to increment
- the value. Pass a negative number to
- decrement the value.
+ :params delta: Number of units by which to increment the value.
+ Pass a negative number to decrement the value.
:returns: The new value
"""
def append_tail(self, key, tail):
"""Appends `tail` to `key`'s value.
- :params key: The key of the value to which
- `tail` should be appended.
- :params tail: The list of values to append to the
- original.
+ :params key: The key of the value to which `tail` should be appended.
+ :params tail: The list of values to append to the original.
:returns: The new value
"""
def append(self, key, value):
"""Appends `value` to `key`'s value.
- :params key: The key of the value to which
- `tail` should be appended.
- :params value: The value to append to the
- original.
+ :params key: The key of the value to which `tail` should be appended.
+ :params value: The value to append to the original.
:returns: The new value
"""
:params key: The key to verify.
- :returns: True if the key exists,
- otherwise False.
+ :returns: True if the key exists, otherwise False.
"""
@abc.abstractmethod
"""Gets keys' value from cache
:params keys: List of keys to retrieve.
- :params default: The default value to return
- for each key that is not in
- the cache.
+ :params default: The default value to return for each key that is not
+ in the cache.
:returns: A generator of (key, value)
"""
def set_many(self, data, ttl=None):
"""Puts several items into the cache at once
- Depending on the backend, this operation may or may
- not be efficient. The default implementation calls
- set for each (key, value) pair passed, other backends
- support set_many operations as part of their protocols.
+ Depending on the backend, this operation may or may not be efficient.
+ The default implementation calls set for each (key, value) pair
+ passed, other backends support set_many operations as part of their
+ protocols.
- :params data: A dictionary like {key: val} to store
- in the cache.
+ :params data: A dictionary like {key: val} to store in the cache.
:params ttl: Key's timeout in seconds.
"""
# License for the specific language governing permissions and limitations
# under the License.
-
import contextlib
import errno
import functools
+import logging
import os
import shutil
import subprocess
from oslo.config import cfg
from neutron.openstack.common import fileutils
-from neutron.openstack.common.gettextutils import _
-from neutron.openstack.common import local
-from neutron.openstack.common import log as logging
+from neutron.openstack.common._i18n import _, _LE, _LI
LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
- help='Whether to disable inter-process locks'),
+ help='Enables or disables inter-process locks.'),
cfg.StrOpt('lock_path',
default=os.environ.get("NEUTRON_LOCK_PATH"),
- help=('Directory to use for lock files.'))
+ help='Directory to use for lock files.')
]
cfg.set_defaults(util_opts, lock_path=lock_path)
-class _InterProcessLock(object):
+class _FileLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
self.lockfile = None
self.fname = name
- def __enter__(self):
+ def acquire(self):
+ basedir = os.path.dirname(self.fname)
+
+ if not os.path.exists(basedir):
+ fileutils.ensure_tree(basedir)
+ LOG.info(_LI('Created lock path: %s'), basedir)
+
self.lockfile = open(self.fname, 'w')
while True:
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
- return self
+ LOG.debug('Got file lock "%s"', self.fname)
+ return True
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
- raise
+ raise threading.ThreadError(_("Unable to acquire lock on"
+ " `%(filename)s` due to"
+ " %(exception)s") %
+ {'filename': self.fname,
+ 'exception': e})
- def __exit__(self, exc_type, exc_val, exc_tb):
+ def __enter__(self):
+ self.acquire()
+ return self
+
+ def release(self):
try:
self.unlock()
self.lockfile.close()
+ LOG.debug('Released file lock "%s"', self.fname)
except IOError:
- LOG.exception(_("Could not release the acquired lock `%s`"),
+ LOG.exception(_LE("Could not release the acquired lock `%s`"),
self.fname)
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.release()
+
+ def exists(self):
+ return os.path.exists(self.fname)
+
def trylock(self):
raise NotImplementedError()
raise NotImplementedError()
-class _WindowsLock(_InterProcessLock):
+class _WindowsLock(_FileLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
-class _PosixLock(_InterProcessLock):
+class _FcntlLock(_FileLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
InterProcessLock = _WindowsLock
else:
import fcntl
- InterProcessLock = _PosixLock
+ InterProcessLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
+def _get_lock_path(name, lock_file_prefix, lock_path=None):
+ # NOTE(mikal): the lock name cannot contain directory
+ # separators
+ name = name.replace(os.sep, '_')
+ if lock_file_prefix:
+ sep = '' if lock_file_prefix.endswith('-') else '-'
+ name = '%s%s%s' % (lock_file_prefix, sep, name)
+
+ local_lock_path = lock_path or CONF.lock_path
+
+ if not local_lock_path:
+ raise cfg.RequiredOptError('lock_path')
+
+ return os.path.join(local_lock_path, name)
+
+
+def external_lock(name, lock_file_prefix=None, lock_path=None):
+ LOG.debug('Attempting to grab external lock "%(lock)s"',
+ {'lock': name})
+
+ lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
+
+ return InterProcessLock(lock_file_path)
+
+
+def remove_external_lock_file(name, lock_file_prefix=None):
+ """Remove an external lock file when it's not used anymore
+ This will be helpful when we have a lot of lock files
+ """
+ with internal_lock(name):
+ lock_file_path = _get_lock_path(name, lock_file_prefix)
+ try:
+ os.remove(lock_file_path)
+ except OSError:
+ LOG.info(_LI('Failed to remove file %(file)s'),
+ {'file': lock_file_path})
+
+
+def internal_lock(name):
+ with _semaphores_lock:
+ try:
+ sem = _semaphores[name]
+ LOG.debug('Using existing semaphore "%s"', name)
+ except KeyError:
+ sem = threading.Semaphore()
+ _semaphores[name] = sem
+ LOG.debug('Created new semaphore "%s"', name)
+
+ return sem
+
+
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
should work across multiple processes. This means that if two different
workers both run a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
-
- :param lock_path: The lock_path keyword argument is used to specify a
- special location for external lock files to live. If nothing is set, then
- CONF.lock_path is used as a default.
"""
- with _semaphores_lock:
- try:
- sem = _semaphores[name]
- except KeyError:
- sem = threading.Semaphore()
- _semaphores[name] = sem
-
- with sem:
- LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
-
- # NOTE(mikal): I know this looks odd
- if not hasattr(local.strong_store, 'locks_held'):
- local.strong_store.locks_held = []
- local.strong_store.locks_held.append(name)
-
+ int_lock = internal_lock(name)
+ with int_lock:
+ LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
try:
if external and not CONF.disable_process_locking:
- LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
- {'lock': name})
-
- # We need a copy of lock_path because it is non-local
- local_lock_path = lock_path or CONF.lock_path
- if not local_lock_path:
- raise cfg.RequiredOptError('lock_path')
-
- if not os.path.exists(local_lock_path):
- fileutils.ensure_tree(local_lock_path)
- LOG.info(_('Created lock path: %s'), local_lock_path)
-
- def add_prefix(name, prefix):
- if not prefix:
- return name
- sep = '' if prefix.endswith('-') else '-'
- return '%s%s%s' % (prefix, sep, name)
-
- # NOTE(mikal): the lock name cannot contain directory
- # separators
- lock_file_name = add_prefix(name.replace(os.sep, '_'),
- lock_file_prefix)
-
- lock_file_path = os.path.join(local_lock_path, lock_file_name)
-
- try:
- lock = InterProcessLock(lock_file_path)
- with lock as lock:
- LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
- {'lock': name, 'path': lock_file_path})
- yield lock
- finally:
- LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
- {'lock': name, 'path': lock_file_path})
+ ext_lock = external_lock(name, lock_file_prefix, lock_path)
+ with ext_lock:
+ yield ext_lock
else:
- yield sem
-
+ yield int_lock
finally:
- local.strong_store.locks_held.remove(name)
+ LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
def inner(*args, **kwargs):
try:
with lock(name, lock_file_prefix, external, lock_path):
- LOG.debug(_('Got semaphore / lock "%(function)s"'),
+ LOG.debug('Got semaphore / lock "%(function)s"',
{'function': f.__name__})
return f(*args, **kwargs)
finally:
- LOG.debug(_('Semaphore / lock released "%(function)s"'),
+ LOG.debug('Semaphore / lock released "%(function)s"',
{'function': f.__name__})
return inner
return wrap