]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Integrate oslo's periodic tasks.
authorSvetlana Shturm <sshturm@mirantis.com>
Mon, 10 Jun 2013 14:02:26 +0000 (15:02 +0100)
committerSvetlana Shturm <sshturm@mirantis.com>
Tue, 11 Jun 2013 06:54:15 +0000 (07:54 +0100)
Cinder use old style invocation of periodic tasks, which not based on oslo library.
It will be better to use main idea of periodic tasks for all services.

Blueprint: oslo-periodic-tasks

Change-Id: I6ac1ca28abefcc9d0bbfd41873c8f57d40b4a97a

cinder/manager.py
cinder/openstack/common/periodic_task.py [new file with mode: 0644]
cinder/volume/manager.py
openstack-common.conf

index 5f52e56865a2f7e0b5c04e90a1862378d8a888a8..96270e530a939681b8d86741a57db501b4ca130f 100644 (file)
@@ -56,6 +56,7 @@ This module provides Manager, a base class for managers.
 from cinder.db import base
 from cinder import flags
 from cinder.openstack.common import log as logging
+from cinder.openstack.common import periodic_task
 from cinder.openstack.common.rpc import dispatcher as rpc_dispatcher
 from cinder.scheduler import rpcapi as scheduler_rpcapi
 from cinder import version
@@ -67,70 +68,7 @@ FLAGS = flags.FLAGS
 LOG = logging.getLogger(__name__)
 
 
-def periodic_task(*args, **kwargs):
-    """Decorator to indicate that a method is a periodic task.
-
-    This decorator can be used in two ways:
-
-        1. Without arguments '@periodic_task', this will be run on every tick
-           of the periodic scheduler.
-
-        2. With arguments, @periodic_task(ticks_between_runs=N), this will be
-           run on every N ticks of the periodic scheduler.
-    """
-    def decorator(f):
-        f._periodic_task = True
-        f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
-        return f
-
-    # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
-    # and without parens.
-    #
-    # In the 'with-parens' case (with kwargs present), this function needs to
-    # return a decorator function since the interpreter will invoke it like:
-    #
-    #   periodic_task(*args, **kwargs)(f)
-    #
-    # In the 'without-parens' case, the original function will be passed
-    # in as the first argument, like:
-    #
-    #   periodic_task(f)
-    if kwargs:
-        return decorator
-    else:
-        return decorator(args[0])
-
-
-class ManagerMeta(type):
-    def __init__(cls, names, bases, dict_):
-        """Metaclass that allows us to collect decorated periodic tasks."""
-        super(ManagerMeta, cls).__init__(names, bases, dict_)
-
-        # NOTE(sirp): if the attribute is not present then we must be the base
-        # class, so, go ahead an initialize it. If the attribute is present,
-        # then we're a subclass so make a copy of it so we don't step on our
-        # parent's toes.
-        try:
-            cls._periodic_tasks = cls._periodic_tasks[:]
-        except AttributeError:
-            cls._periodic_tasks = []
-
-        try:
-            cls._ticks_to_skip = cls._ticks_to_skip.copy()
-        except AttributeError:
-            cls._ticks_to_skip = {}
-
-        for value in cls.__dict__.values():
-            if getattr(value, '_periodic_task', False):
-                task = value
-                name = task.__name__
-                cls._periodic_tasks.append((name, task))
-                cls._ticks_to_skip[name] = task._ticks_between_runs
-
-
-class Manager(base.Base):
-    __metaclass__ = ManagerMeta
-
+class Manager(base.Base, periodic_task.PeriodicTasks):
     # Set RPC API version to 1.0 by default.
     RPC_API_VERSION = '1.0'
 
@@ -150,26 +88,7 @@ class Manager(base.Base):
 
     def periodic_tasks(self, context, raise_on_error=False):
         """Tasks to be run at a periodic interval."""
-        for task_name, task in self._periodic_tasks:
-            full_task_name = '.'.join([self.__class__.__name__, task_name])
-
-            ticks_to_skip = self._ticks_to_skip[task_name]
-            if ticks_to_skip > 0:
-                LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
-                            " ticks left until next run"), locals())
-                self._ticks_to_skip[task_name] -= 1
-                continue
-
-            self._ticks_to_skip[task_name] = task._ticks_between_runs
-            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
-
-            try:
-                task(self, context)
-            except Exception as e:
-                if raise_on_error:
-                    raise
-                LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
-                              locals())
+        return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
 
     def init_host(self):
         """Handle initialization if this is a standalone service.
@@ -209,7 +128,7 @@ class SchedulerDependentManager(Manager):
         """Remember these capabilities to send on next periodic update."""
         self.last_capabilities = capabilities
 
-    @periodic_task
+    @periodic_task.periodic_task
     def _publish_service_capabilities(self, context):
         """Pass data back to the scheduler at a periodic interval."""
         if self.last_capabilities:
diff --git a/cinder/openstack/common/periodic_task.py b/cinder/openstack/common/periodic_task.py
new file mode 100644 (file)
index 0000000..1693bd8
--- /dev/null
@@ -0,0 +1,188 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import datetime
+import time
+
+from oslo.config import cfg
+
+from cinder.openstack.common.gettextutils import _
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import timeutils
+
+
+periodic_opts = [
+    cfg.BoolOpt('run_external_periodic_tasks',
+                default=True,
+                help=('Some periodic tasks can be run in a separate process. '
+                      'Should we run them here?')),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(periodic_opts)
+
+LOG = logging.getLogger(__name__)
+
+DEFAULT_INTERVAL = 60.0
+
+
+class InvalidPeriodicTaskArg(Exception):
+    message = _("Unexpected argument for periodic task creation: %(arg)s.")
+
+
+def periodic_task(*args, **kwargs):
+    """Decorator to indicate that a method is a periodic task.
+
+    This decorator can be used in two ways:
+
+        1. Without arguments '@periodic_task', this will be run on every cycle
+           of the periodic scheduler.
+
+        2. With arguments:
+           @periodic_task(spacing=N [, run_immediately=[True|False]])
+           this will be run on approximately every N seconds. If this number is
+           negative the periodic task will be disabled. If the run_immediately
+           argument is provided and has a value of 'True', the first run of the
+           task will be shortly after task scheduler starts.  If
+           run_immediately is omitted or set to 'False', the first time the
+           task runs will be approximately N seconds after the task scheduler
+           starts.
+    """
+    def decorator(f):
+        # Test for old style invocation
+        if 'ticks_between_runs' in kwargs:
+            raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
+
+        # Control if run at all
+        f._periodic_task = True
+        f._periodic_external_ok = kwargs.pop('external_process_ok', False)
+        if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
+            f._periodic_enabled = False
+        else:
+            f._periodic_enabled = kwargs.pop('enabled', True)
+
+        # Control frequency
+        f._periodic_spacing = kwargs.pop('spacing', 0)
+        f._periodic_immediate = kwargs.pop('run_immediately', False)
+        if f._periodic_immediate:
+            f._periodic_last_run = None
+        else:
+            f._periodic_last_run = timeutils.utcnow()
+        return f
+
+    # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
+    # and without parens.
+    #
+    # In the 'with-parens' case (with kwargs present), this function needs to
+    # return a decorator function since the interpreter will invoke it like:
+    #
+    #   periodic_task(*args, **kwargs)(f)
+    #
+    # In the 'without-parens' case, the original function will be passed
+    # in as the first argument, like:
+    #
+    #   periodic_task(f)
+    if kwargs:
+        return decorator
+    else:
+        return decorator(args[0])
+
+
+class _PeriodicTasksMeta(type):
+    def __init__(cls, names, bases, dict_):
+        """Metaclass that allows us to collect decorated periodic tasks."""
+        super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
+
+        # NOTE(sirp): if the attribute is not present then we must be the base
+        # class, so, go ahead an initialize it. If the attribute is present,
+        # then we're a subclass so make a copy of it so we don't step on our
+        # parent's toes.
+        try:
+            cls._periodic_tasks = cls._periodic_tasks[:]
+        except AttributeError:
+            cls._periodic_tasks = []
+
+        try:
+            cls._periodic_last_run = cls._periodic_last_run.copy()
+        except AttributeError:
+            cls._periodic_last_run = {}
+
+        try:
+            cls._periodic_spacing = cls._periodic_spacing.copy()
+        except AttributeError:
+            cls._periodic_spacing = {}
+
+        for value in cls.__dict__.values():
+            if getattr(value, '_periodic_task', False):
+                task = value
+                name = task.__name__
+
+                if task._periodic_spacing < 0:
+                    LOG.info(_('Skipping periodic task %(task)s because '
+                               'its interval is negative'),
+                             {'task': name})
+                    continue
+                if not task._periodic_enabled:
+                    LOG.info(_('Skipping periodic task %(task)s because '
+                               'it is disabled'),
+                             {'task': name})
+                    continue
+
+                # A periodic spacing of zero indicates that this task should
+                # be run every pass
+                if task._periodic_spacing == 0:
+                    task._periodic_spacing = None
+
+                cls._periodic_tasks.append((name, task))
+                cls._periodic_spacing[name] = task._periodic_spacing
+                cls._periodic_last_run[name] = task._periodic_last_run
+
+
+class PeriodicTasks(object):
+    __metaclass__ = _PeriodicTasksMeta
+
+    def run_periodic_tasks(self, context, raise_on_error=False):
+        """Tasks to be run at a periodic interval."""
+        idle_for = DEFAULT_INTERVAL
+        for task_name, task in self._periodic_tasks:
+            full_task_name = '.'.join([self.__class__.__name__, task_name])
+
+            now = timeutils.utcnow()
+            spacing = self._periodic_spacing[task_name]
+            last_run = self._periodic_last_run[task_name]
+
+            # If a periodic task is _nearly_ due, then we'll run it early
+            if spacing is not None and last_run is not None:
+                due = last_run + datetime.timedelta(seconds=spacing)
+                if not timeutils.is_soon(due, 0.2):
+                    idle_for = min(idle_for, timeutils.delta_seconds(now, due))
+                    continue
+
+            if spacing is not None:
+                idle_for = min(idle_for, spacing)
+
+            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+            self._periodic_last_run[task_name] = timeutils.utcnow()
+
+            try:
+                task(self, context)
+            except Exception as e:
+                if raise_on_error:
+                    raise
+                LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
+                              locals())
+            time.sleep(0)
+
+        return idle_for
index 3ed0fb90e996d5e7fe1d85b203e2e6fe6128b52a..97bcd958e25c99b72313d6a1097399b264987670 100644 (file)
@@ -50,6 +50,7 @@ from cinder import manager
 from cinder.openstack.common import excutils
 from cinder.openstack.common import importutils
 from cinder.openstack.common import log as logging
+from cinder.openstack.common import periodic_task
 from cinder.openstack.common import timeutils
 from cinder.openstack.common import uuidutils
 from cinder import quota
@@ -705,7 +706,7 @@ class VolumeManager(manager.SchedulerDependentManager):
         volume_ref = self.db.volume_get(context, volume_id)
         self.driver.accept_transfer(volume_ref)
 
-    @manager.periodic_task
+    @periodic_task.periodic_task
     def _report_driver_status(self, context):
         LOG.info(_("Updating volume status"))
         volume_stats = self.driver.get_volume_stats(refresh=True)
index b065f811495873f763856aa49f332b0102b990ad..2a183610c150b5a120221f2b2c0d3fdd73ebabc4 100644 (file)
@@ -18,6 +18,7 @@ module=log
 module=network_utils
 module=notifier
 module=patch_tox_venv
+module=periodic_task
 module=policy
 module=processutils
 module=redhat-eventlet.patch