from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
from oslo_utils import importutils
from neutron.agent.linux import dhcp
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
-from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
import sys
from oslo_config import cfg
+from oslo_service import service
from neutron.agent.common import config
from neutron.agent.dhcp import config as dhcp_config
from neutron.agent.metadata import config as metadata_config
from neutron.common import config as common_config
from neutron.common import topics
-from neutron.openstack.common import service
from neutron import service as neutron_service
- service.launch(server).wait()
+ service.launch(cfg.CONF, server).wait()
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
+from oslo_service import periodic_task
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
from neutron import context as n_context
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
-from neutron.openstack.common import loopingcall
-from neutron.openstack.common import periodic_task
from \
import sys
from oslo_config import cfg
+from oslo_service import service
from neutron.agent.common import config
from neutron.agent.l3 import config as l3_config
from neutron.agent.metadata import config as metadata_config
from neutron.common import config as common_config
from neutron.common import topics
-from neutron.openstack.common import service
from neutron import service as neutron_service
- service.launch(server).wait()
+ service.launch(cfg.CONF, server).wait()
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
import six
import six.moves.urllib.parse as urlparse
import webob
from neutron import context
from neutron.i18n import _LE, _LW
from neutron.openstack.common.cache import cache
-from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import serializer as om_serializer
+from oslo_service import service
from neutron.common import exceptions
from neutron import context
-from neutron.openstack.common import service
LOG = logging.getLogger(__name__)
from oslo_config import cfg
from oslo_log import log as logging
+from oslo_service import loopingcall
from oslo_utils import timeutils
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.extensions import agent as ext_agent
from neutron.extensions import dhcpagentscheduler
from neutron.i18n import _LE, _LI, _LW
-from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import periodic_task
from oslo_utils import importutils
import six
from neutron.common import utils
from neutron.i18n import _LE, _LI
-from neutron.openstack.common import periodic_task
from neutron.plugins.common import constants
from stevedore import driver
if not host:
host = = host
- super(Manager, self).__init__()
+ conf = getattr(self, "conf", cfg.CONF)
+ super(Manager, self).__init__(conf)
def periodic_tasks(self, context, raise_on_error=False):
self.run_periodic_tasks(context, raise_on_error=raise_on_error)
+++ /dev/null
-# Copyright (c) 2012 OpenStack Foundation.
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from __future__ import print_function
-import copy
-import errno
-import gc
-import logging
-import os
-import pprint
-import socket
-import sys
-import traceback
-import eventlet.backdoor
-import greenlet
-from oslo_config import cfg
-from neutron.openstack.common._i18n import _LI
-help_for_backdoor_port = (
- "Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
- "in listening on a random tcp port number; <port> results in listening "
- "on the specified port number (and not enabling backdoor if that port "
- "is in use); and <start>:<end> results in listening on the smallest "
- "unused port number within the specified range of port numbers. The "
- "chosen port is displayed in the service's log file.")
-eventlet_backdoor_opts = [
- cfg.StrOpt('backdoor_port',
- help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
-CONF = cfg.CONF
-LOG = logging.getLogger(__name__)
-def list_opts():
- """Entry point for oslo-config-generator.
- """
- return [(None, copy.deepcopy(eventlet_backdoor_opts))]
-class EventletBackdoorConfigValueError(Exception):
- def __init__(self, port_range, help_msg, ex):
- msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
- '%(help)s' %
- {'range': port_range, 'ex': ex, 'help': help_msg})
- super(EventletBackdoorConfigValueError, self).__init__(msg)
- self.port_range = port_range
-def _dont_use_this():
- print("Don't use this, just disconnect instead")
-def _find_objects(t):
- return [o for o in gc.get_objects() if isinstance(o, t)]
-def _print_greenthreads():
- for i, gt in enumerate(_find_objects(greenlet.greenlet)):
- print(i, gt)
- traceback.print_stack(gt.gr_frame)
- print()
-def _print_nativethreads():
- for threadId, stack in sys._current_frames().items():
- print(threadId)
- traceback.print_stack(stack)
- print()
-def _parse_port_range(port_range):
- if ':' not in port_range:
- start, end = port_range, port_range
- else:
- start, end = port_range.split(':', 1)
- try:
- start, end = int(start), int(end)
- if end < start:
- raise ValueError
- return start, end
- except ValueError as ex:
- raise EventletBackdoorConfigValueError(port_range, ex,
- help_for_backdoor_port)
-def _listen(host, start_port, end_port, listen_func):
- try_port = start_port
- while True:
- try:
- return listen_func((host, try_port))
- except socket.error as exc:
- if (exc.errno != errno.EADDRINUSE or
- try_port >= end_port):
- raise
- try_port += 1
-def initialize_if_enabled():
- backdoor_locals = {
- 'exit': _dont_use_this, # So we don't exit the entire process
- 'quit': _dont_use_this, # So we don't exit the entire process
- 'fo': _find_objects,
- 'pgt': _print_greenthreads,
- 'pnt': _print_nativethreads,
- }
- if CONF.backdoor_port is None:
- return None
- start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
- # NOTE(johannes): The standard sys.displayhook will print the value of
- # the last expression and set it to __builtin__._, which overwrites
- # the __builtin__._ that gettext sets. Let's switch to using pprint
- # since it won't interact poorly with gettext, and it's easier to
- # read the output too.
- def displayhook(val):
- if val is not None:
- pprint.pprint(val)
- sys.displayhook = displayhook
- sock = _listen('localhost', start_port, end_port, eventlet.listen)
- # In the case of backdoor port being zero, a port number is assigned by
- # listen(). In any case, pull the port number out here.
- port = sock.getsockname()[1]
- _LI('Eventlet backdoor listening on %(port)s for process %(pid)d'),
- {'port': port, 'pid': os.getpid()}
- )
- eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
- locals=backdoor_locals)
- return port
+++ /dev/null
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# Copyright 2011 Justin Santa Barbara
-# All Rights Reserved.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import sys
-import time
-from eventlet import event
-from eventlet import greenthread
-from neutron.openstack.common._i18n import _LE, _LW
-LOG = logging.getLogger(__name__)
-# NOTE(zyluo): This lambda function was declared to avoid mocking collisions
-# with time.time() called in the standard logging module
-# during unittests.
-_ts = lambda: time.time()
-class LoopingCallDone(Exception):
- """Exception to break out and stop a LoopingCallBase.
- The poll-function passed to LoopingCallBase can raise this exception to
- break out of the loop normally. This is somewhat analogous to
- StopIteration.
- An optional return-value can be included as the argument to the exception;
- this return-value will be returned by LoopingCallBase.wait()
- """
- def __init__(self, retvalue=True):
- """:param retvalue: Value that LoopingCallBase.wait() should return."""
- self.retvalue = retvalue
-class LoopingCallBase(object):
- def __init__(self, f=None, *args, **kw):
- self.args = args
- = kw
- self.f = f
- self._running = False
- self.done = None
- def stop(self):
- self._running = False
- def wait(self):
- return self.done.wait()
-class FixedIntervalLoopingCall(LoopingCallBase):
- """A fixed interval looping call."""
- def start(self, interval, initial_delay=None):
- self._running = True
- done = event.Event()
- def _inner():
- if initial_delay:
- greenthread.sleep(initial_delay)
- try:
- while self._running:
- start = _ts()
- self.f(*self.args, **
- end = _ts()
- if not self._running:
- break
- delay = end - start - interval
- if delay > 0:
- LOG.warning(_LW('task %(func_name)r run outlasted '
- 'interval by %(delay).2f sec'),
- {'func_name': self.f, 'delay': delay})
- greenthread.sleep(-delay if delay < 0 else 0)
- except LoopingCallDone as e:
- self.stop()
- done.send(e.retvalue)
- except Exception:
- LOG.exception(_LE('in fixed duration looping call'))
- done.send_exception(*sys.exc_info())
- return
- else:
- done.send(True)
- self.done = done
- greenthread.spawn_n(_inner)
- return self.done
-class DynamicLoopingCall(LoopingCallBase):
- """A looping call which sleeps until the next known event.
- The function called should return how long to sleep for before being
- called again.
- """
- def start(self, initial_delay=None, periodic_interval_max=None):
- self._running = True
- done = event.Event()
- def _inner():
- if initial_delay:
- greenthread.sleep(initial_delay)
- try:
- while self._running:
- idle = self.f(*self.args, **
- if not self._running:
- break
- if periodic_interval_max is not None:
- idle = min(idle, periodic_interval_max)
- LOG.debug('Dynamic looping call %(func_name)r sleeping '
- 'for %(idle).02f seconds',
- {'func_name': self.f, 'idle': idle})
- greenthread.sleep(idle)
- except LoopingCallDone as e:
- self.stop()
- done.send(e.retvalue)
- except Exception:
- LOG.exception(_LE('in dynamic looping call'))
- done.send_exception(*sys.exc_info())
- return
- else:
- done.send(True)
- self.done = done
- greenthread.spawn(_inner)
- return self.done
+++ /dev/null
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import copy
-import logging
-import random
-import time
-from oslo_config import cfg
-import six
-from neutron.openstack.common._i18n import _, _LE, _LI
-periodic_opts = [
- cfg.BoolOpt('run_external_periodic_tasks',
- default=True,
- help='Some periodic tasks can be run in a separate process. '
- 'Should we run them here?'),
-CONF = cfg.CONF
-LOG = logging.getLogger(__name__)
-def list_opts():
- """Entry point for oslo-config-generator."""
- return [(None, copy.deepcopy(periodic_opts))]
-class InvalidPeriodicTaskArg(Exception):
- message = _("Unexpected argument for periodic task creation: %(arg)s.")
-def periodic_task(*args, **kwargs):
- """Decorator to indicate that a method is a periodic task.
- This decorator can be used in two ways:
- 1. Without arguments '@periodic_task', this will be run on the default
- interval of 60 seconds.
- 2. With arguments:
- @periodic_task(spacing=N [, run_immediately=[True|False]]
- [, name=[None|"string"])
- this will be run on approximately every N seconds. If this number is
- negative the periodic task will be disabled. If the run_immediately
- argument is provided and has a value of 'True', the first run of the
- task will be shortly after task scheduler starts. If
- run_immediately is omitted or set to 'False', the first time the
- task runs will be approximately N seconds after the task scheduler
- starts. If name is not provided, __name__ of function is used.
- """
- def decorator(f):
- # Test for old style invocation
- if 'ticks_between_runs' in kwargs:
- raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
- # Control if run at all
- f._periodic_task = True
- f._periodic_external_ok = kwargs.pop('external_process_ok', False)
- if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
- f._periodic_enabled = False
- else:
- f._periodic_enabled = kwargs.pop('enabled', True)
- f._periodic_name = kwargs.pop('name', f.__name__)
- # Control frequency
- f._periodic_spacing = kwargs.pop('spacing', 0)
- f._periodic_immediate = kwargs.pop('run_immediately', False)
- if f._periodic_immediate:
- f._periodic_last_run = None
- else:
- f._periodic_last_run = time.time()
- return f
- # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
- # and without parenthesis.
- #
- # In the 'with-parenthesis' case (with kwargs present), this function needs
- # to return a decorator function since the interpreter will invoke it like:
- #
- # periodic_task(*args, **kwargs)(f)
- #
- # In the 'without-parenthesis' case, the original function will be passed
- # in as the first argument, like:
- #
- # periodic_task(f)
- if kwargs:
- return decorator
- else:
- return decorator(args[0])
-class _PeriodicTasksMeta(type):
- def _add_periodic_task(cls, task):
- """Add a periodic task to the list of periodic tasks.
- The task should already be decorated by @periodic_task.
- :return: whether task was actually enabled
- """
- name = task._periodic_name
- if task._periodic_spacing < 0:
-'Skipping periodic task %(task)s because '
- 'its interval is negative'),
- {'task': name})
- return False
- if not task._periodic_enabled:
-'Skipping periodic task %(task)s because '
- 'it is disabled'),
- {'task': name})
- return False
- # A periodic spacing of zero indicates that this task should
- # be run on the default interval to avoid running too
- # frequently.
- if task._periodic_spacing == 0:
- task._periodic_spacing = DEFAULT_INTERVAL
- cls._periodic_tasks.append((name, task))
- cls._periodic_spacing[name] = task._periodic_spacing
- return True
- def __init__(cls, names, bases, dict_):
- """Metaclass that allows us to collect decorated periodic tasks."""
- super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
- # NOTE(sirp): if the attribute is not present then we must be the base
- # class, so, go ahead an initialize it. If the attribute is present,
- # then we're a subclass so make a copy of it so we don't step on our
- # parent's toes.
- try:
- cls._periodic_tasks = cls._periodic_tasks[:]
- except AttributeError:
- cls._periodic_tasks = []
- try:
- cls._periodic_spacing = cls._periodic_spacing.copy()
- except AttributeError:
- cls._periodic_spacing = {}
- for value in cls.__dict__.values():
- if getattr(value, '_periodic_task', False):
- cls._add_periodic_task(value)
-def _nearest_boundary(last_run, spacing):
- """Find nearest boundary which is in the past, which is a multiple of the
- spacing with the last run as an offset.
- Eg if last run was 10 and spacing was 7, the new last run could be: 17, 24,
- 31, 38...
- 0% to 5% of the spacing value will be added to this value to ensure tasks
- do not synchronize. This jitter is rounded to the nearest second, this
- means that spacings smaller than 20 seconds will not have jitter.
- """
- current_time = time.time()
- if last_run is None:
- return current_time
- delta = current_time - last_run
- offset = delta % spacing
- # Add up to 5% jitter
- jitter = int(spacing * (random.random() / 20))
- return current_time - offset + jitter
-class PeriodicTasks(object):
- def __init__(self):
- super(PeriodicTasks, self).__init__()
- self._periodic_last_run = {}
- for name, task in self._periodic_tasks:
- self._periodic_last_run[name] = task._periodic_last_run
- def add_periodic_task(self, task):
- """Add a periodic task to the list of periodic tasks.
- The task should already be decorated by @periodic_task.
- """
- if self.__class__._add_periodic_task(task):
- self._periodic_last_run[task._periodic_name] = (
- task._periodic_last_run)
- def run_periodic_tasks(self, context, raise_on_error=False):
- """Tasks to be run at a periodic interval."""
- for task_name, task in self._periodic_tasks:
- full_task_name = '.'.join([self.__class__.__name__, task_name])
- spacing = self._periodic_spacing[task_name]
- last_run = self._periodic_last_run[task_name]
- # Check if due, if not skip
- idle_for = min(idle_for, spacing)
- if last_run is not None:
- delta = last_run + spacing - time.time()
- if delta > 0:
- idle_for = min(idle_for, delta)
- continue
- LOG.debug("Running periodic task %(full_task_name)s",
- {"full_task_name": full_task_name})
- self._periodic_last_run[task_name] = _nearest_boundary(
- last_run, spacing)
- try:
- task(self, context)
- except Exception:
- if raise_on_error:
- raise
- LOG.exception(_LE("Error during %(full_task_name)s"),
- {"full_task_name": full_task_name})
- time.sleep(0)
- return idle_for
+++ /dev/null
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# Copyright 2011 Justin Santa Barbara
-# All Rights Reserved.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""Generic Node base class for all workers that run on hosts."""
-import errno
-import io
-import logging
-import os
-import random
-import signal
-import sys
-import time
-import eventlet
-from eventlet import event
-from oslo_config import cfg
-from neutron.openstack.common import eventlet_backdoor
-from neutron.openstack.common._i18n import _LE, _LI, _LW
-from neutron.openstack.common import systemd
-from neutron.openstack.common import threadgroup
-CONF = cfg.CONF
-LOG = logging.getLogger(__name__)
-def _sighup_supported():
- return hasattr(signal, 'SIGHUP')
-def _is_daemon():
- # The process group for a foreground process will match the
- # process group of the controlling terminal. If those values do
- # not match, or ioctl() fails on the stdout file handle, we assume
- # the process is running in the background as a daemon.
- #
- try:
- is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
- except io.UnsupportedOperation:
- # Could not get the fileno for stdout, so we must be a daemon.
- is_daemon = True
- except OSError as err:
- if err.errno == errno.ENOTTY:
- # Assume we are a daemon because there is no terminal.
- is_daemon = True
- else:
- raise
- return is_daemon
-def _is_sighup_and_daemon(signo):
- if not (_sighup_supported() and signo == signal.SIGHUP):
- # Avoid checking if we are a daemon, because the signal isn't
- return False
- return _is_daemon()
-def _signo_to_signame(signo):
- signals = {signal.SIGTERM: 'SIGTERM',
- signal.SIGINT: 'SIGINT'}
- if _sighup_supported():
- signals[signal.SIGHUP] = 'SIGHUP'
- return signals[signo]
-def _set_signals_handler(handler):
- signal.signal(signal.SIGTERM, handler)
- signal.signal(signal.SIGINT, handler)
- if _sighup_supported():
- signal.signal(signal.SIGHUP, handler)
-class Launcher(object):
- """Launch one or more services and wait for them to complete."""
- def __init__(self):
- """Initialize the service launcher.
- :returns: None
- """
- = Services()
- self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
- def launch_service(self, service):
- """Load and start the given service.
- :param service: The service you would like to start.
- :returns: None
- """
- service.backdoor_port = self.backdoor_port
- def stop(self):
- """Stop all services which are currently running.
- :returns: None
- """
- def wait(self):
- """Waits until all services have been stopped, and then returns.
- :returns: None
- """
- def restart(self):
- """Reload config files and restart service.
- :returns: None
- """
- cfg.CONF.reload_config_files()
-class SignalExit(SystemExit):
- def __init__(self, signo, exccode=1):
- super(SignalExit, self).__init__(exccode)
- self.signo = signo
-class ServiceLauncher(Launcher):
- def _handle_signal(self, signo, frame):
- # Allow the process to be killed again and die from natural causes
- _set_signals_handler(signal.SIG_DFL)
- raise SignalExit(signo)
- def handle_signal(self):
- _set_signals_handler(self._handle_signal)
- def _wait_for_exit_or_signal(self, ready_callback=None):
- status = None
- signo = 0
- LOG.debug('Full set of CONF:')
- CONF.log_opt_values(LOG, logging.DEBUG)
- try:
- if ready_callback:
- ready_callback()
- super(ServiceLauncher, self).wait()
- except SignalExit as exc:
- signame = _signo_to_signame(exc.signo)
-'Caught %s, exiting'), signame)
- status = exc.code
- signo = exc.signo
- except SystemExit as exc:
- status = exc.code
- finally:
- self.stop()
- return status, signo
- def wait(self, ready_callback=None):
- systemd.notify_once()
- while True:
- self.handle_signal()
- status, signo = self._wait_for_exit_or_signal(ready_callback)
- if not _is_sighup_and_daemon(signo):
- return status
- self.restart()
-class ServiceWrapper(object):
- def __init__(self, service, workers):
- self.service = service
- self.workers = workers
- self.children = set()
- self.forktimes = []
-class ProcessLauncher(object):
- _signal_handlers_set = set()
- @classmethod
- def _handle_class_signals(cls, *args, **kwargs):
- for handler in cls._signal_handlers_set:
- handler(*args, **kwargs)
- def __init__(self, wait_interval=0.01):
- """Constructor.
- :param wait_interval: The interval to sleep for between checks
- of child process exit.
- """
- self.children = {}
- self.sigcaught = None
- self.running = True
- self.wait_interval = wait_interval
- rfd, self.writepipe = os.pipe()
- self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
- self.handle_signal()
- def handle_signal(self):
- self._signal_handlers_set.add(self._handle_signal)
- _set_signals_handler(self._handle_class_signals)
- def _handle_signal(self, signo, frame):
- self.sigcaught = signo
- self.running = False
- # Allow the process to be killed again and die from natural causes
- _set_signals_handler(signal.SIG_DFL)
- def _pipe_watcher(self):
- # This will block until the write end is closed when the parent
- # dies unexpectedly
-'Parent process has died unexpectedly, exiting'))
- sys.exit(1)
- def _child_process_handle_signal(self):
- # Setup child signal handlers differently
- def _sighup(*args):
- signal.signal(signal.SIGHUP, signal.SIG_DFL)
- raise SignalExit(signal.SIGHUP)
- # Parent signals with SIGTERM when it wants us to go away.
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- if _sighup_supported():
- signal.signal(signal.SIGHUP, _sighup)
- # Block SIGINT and let the parent send us a SIGTERM
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- def _child_wait_for_exit_or_signal(self, launcher):
- status = 0
- signo = 0
- # NOTE(johannes): All exceptions are caught to ensure this
- # doesn't fallback into the loop spawning children. It would
- # be bad for a child to spawn more children.
- try:
- launcher.wait()
- except SignalExit as exc:
- signame = _signo_to_signame(exc.signo)
-'Child caught %s, exiting'), signame)
- status = exc.code
- signo = exc.signo
- except SystemExit as exc:
- status = exc.code
- except BaseException:
- LOG.exception(_LE('Unhandled exception'))
- status = 2
- finally:
- launcher.stop()
- return status, signo
- def _child_process(self, service):
- self._child_process_handle_signal()
- # Reopen the eventlet hub to make sure we don't share an epoll
- # fd with parent and/or siblings, which would be bad
- eventlet.hubs.use_hub()
- # Close write to ensure only parent has it open
- os.close(self.writepipe)
- # Create greenthread to watch for parent to close pipe
- eventlet.spawn_n(self._pipe_watcher)
- # Reseed random number generator
- random.seed()
- launcher = Launcher()
- launcher.launch_service(service)
- return launcher
- def _start_child(self, wrap):
- if len(wrap.forktimes) > wrap.workers:
- # Limit ourselves to one process a second (over the period of
- # number of workers * 1 second). This will allow workers to
- # start up quickly but ensure we don't fork off children that
- # die instantly too quickly.
- if time.time() - wrap.forktimes[0] < wrap.workers:
-'Forking too fast, sleeping'))
- time.sleep(1)
- wrap.forktimes.pop(0)
- wrap.forktimes.append(time.time())
- pid = os.fork()
- if pid == 0:
- launcher = self._child_process(wrap.service)
- while True:
- self._child_process_handle_signal()
- status, signo = self._child_wait_for_exit_or_signal(launcher)
- if not _is_sighup_and_daemon(signo):
- break
- launcher.restart()
- os._exit(status)
-'Started child %d'), pid)
- wrap.children.add(pid)
- self.children[pid] = wrap
- return pid
- def launch_service(self, service, workers=1):
- wrap = ServiceWrapper(service, workers)
-'Starting %d workers'), wrap.workers)
- while self.running and len(wrap.children) < wrap.workers:
- self._start_child(wrap)
- def _wait_child(self):
- try:
- # Don't block if no child processes have exited
- pid, status = os.waitpid(0, os.WNOHANG)
- if not pid:
- return None
- except OSError as exc:
- if exc.errno not in (errno.EINTR, errno.ECHILD):
- raise
- return None
- if os.WIFSIGNALED(status):
- sig = os.WTERMSIG(status)
-'Child %(pid)d killed by signal %(sig)d'),
- dict(pid=pid, sig=sig))
- else:
- code = os.WEXITSTATUS(status)
-'Child %(pid)s exited with status %(code)d'),
- dict(pid=pid, code=code))
- if pid not in self.children:
- LOG.warning(_LW('pid %d not in child list'), pid)
- return None
- wrap = self.children.pop(pid)
- wrap.children.remove(pid)
- return wrap
- def _respawn_children(self):
- while self.running:
- wrap = self._wait_child()
- if not wrap:
- # Yield to other threads if no children have exited
- # Sleep for a short time to avoid excessive CPU usage
- # (see bug #1095346)
- eventlet.greenthread.sleep(self.wait_interval)
- continue
- while self.running and len(wrap.children) < wrap.workers:
- self._start_child(wrap)
- def wait(self):
- """Loop waiting on children to die and respawning as necessary."""
- systemd.notify_once()
- LOG.debug('Full set of CONF:')
- CONF.log_opt_values(LOG, logging.DEBUG)
- try:
- while True:
- self.handle_signal()
- self._respawn_children()
- # No signal means that stop was called. Don't clean up here.
- if not self.sigcaught:
- return
- signame = _signo_to_signame(self.sigcaught)
-'Caught %s, stopping children'), signame)
- if not _is_sighup_and_daemon(self.sigcaught):
- break
- cfg.CONF.reload_config_files()
- for service in set(
- [wrap.service for wrap in self.children.values()]):
- service.reset()
- for pid in self.children:
- os.kill(pid, signal.SIGHUP)
- self.running = True
- self.sigcaught = None
- except eventlet.greenlet.GreenletExit:
-"Wait called after thread killed. Cleaning up."))
- self.stop()
- def stop(self):
- """Terminate child processes and wait on each."""
- self.running = False
- for pid in self.children:
- try:
- os.kill(pid, signal.SIGTERM)
- except OSError as exc:
- if exc.errno != errno.ESRCH:
- raise
- # Wait for children to die
- if self.children:
-'Waiting on %d children to exit'), len(self.children))
- while self.children:
- self._wait_child()
-class Service(object):
- """Service object for binaries running on hosts."""
- def __init__(self, threads=1000):
- = threadgroup.ThreadGroup(threads)
- # signal that the service is done shutting itself down:
- self._done = event.Event()
- def reset(self):
- # NOTE(Fengqian): docs for Event.reset() recommend against using it
- self._done = event.Event()
- def start(self):
- pass
- def stop(self, graceful=False):
- # Signal that service cleanup is done:
- if not self._done.ready():
- self._done.send()
- def wait(self):
- self._done.wait()
-class Services(object):
- def __init__(self):
- = []
- = threadgroup.ThreadGroup()
- self.done = event.Event()
- def add(self, service):
-, service, self.done)
- def stop(self):
- # wait for graceful shutdown of services:
- for service in
- service.stop()
- service.wait()
- # Each service has performed cleanup, now signal that the run_service
- # wrapper threads can now die:
- if not self.done.ready():
- self.done.send()
- # reap threads:
- def wait(self):
- def restart(self):
- self.stop()
- self.done = event.Event()
- for restart_service in
- restart_service.reset()
-, restart_service, self.done)
- @staticmethod
- def run_service(service, done):
- """Service start wrapper.
- :param service: service to run
- :param done: event to wait on until a shutdown is triggered
- :returns: None
- """
- service.start()
- done.wait()
-def launch(service, workers=1):
- if workers is None or workers == 1:
- launcher = ServiceLauncher()
- launcher.launch_service(service)
- else:
- launcher = ProcessLauncher()
- launcher.launch_service(service, workers=workers)
- return launcher
+++ /dev/null
-# Copyright 2012-2014 Red Hat, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-Helper module for systemd service readiness notification.
-import logging
-import os
-import socket
-import sys
-LOG = logging.getLogger(__name__)
-def _abstractify(socket_name):
- if socket_name.startswith('@'):
- # abstract namespace socket
- socket_name = '\0%s' % socket_name[1:]
- return socket_name
-def _sd_notify(unset_env, msg):
- notify_socket = os.getenv('NOTIFY_SOCKET')
- if notify_socket:
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- try:
- sock.connect(_abstractify(notify_socket))
- sock.sendall(msg)
- if unset_env:
- del os.environ['NOTIFY_SOCKET']
- except EnvironmentError:
- LOG.debug("Systemd notification failed", exc_info=True)
- finally:
- sock.close()
-def notify():
- """Send notification to Systemd that service is ready.
- For details see
- """
- _sd_notify(False, 'READY=1')
-def notify_once():
- """Send notification once to Systemd that service is ready.
- Systemd sets NOTIFY_SOCKET environment variable with the name of the
- socket listening for notifications from services.
- This method removes the NOTIFY_SOCKET environment variable to ensure
- notification is sent only once.
- """
- _sd_notify(True, 'READY=1')
-def onready(notify_socket, timeout):
- """Wait for systemd style notification on the socket.
- :param notify_socket: local socket address
- :type notify_socket: string
- :param timeout: socket timeout
- :type timeout: float
- :returns: 0 service ready
- 1 service not ready
- 2 timeout occurred
- """
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- sock.settimeout(timeout)
- sock.bind(_abstractify(notify_socket))
- try:
- msg = sock.recv(512)
- except socket.timeout:
- return 2
- finally:
- sock.close()
- if 'READY=1' in msg:
- return 0
- else:
- return 1
-if __name__ == '__main__':
- # simple CLI for testing
- if len(sys.argv) == 1:
- notify()
- elif len(sys.argv) >= 2:
- timeout = float(sys.argv[1])
- notify_socket = os.getenv('NOTIFY_SOCKET')
- if notify_socket:
- retval = onready(notify_socket, timeout)
- sys.exit(retval)
+++ /dev/null
-# Copyright 2012 Red Hat, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import threading
-import eventlet
-from eventlet import greenpool
-from neutron.openstack.common._i18n import _LE
-from neutron.openstack.common import loopingcall
-LOG = logging.getLogger(__name__)
-def _thread_done(gt, *args, **kwargs):
- """Callback function to be passed to when we spawn()
- Calls the :class:`ThreadGroup` to notify if.
- """
- kwargs['group'].thread_done(kwargs['thread'])
-class Thread(object):
- """Wrapper around a greenthread, that holds a reference to the
- :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
- it has done so it can be removed from the threads list.
- """
- def __init__(self, thread, group):
- self.thread = thread
-, group=group, thread=self)
- def stop(self):
- self.thread.kill()
- def wait(self):
- return self.thread.wait()
- def link(self, func, *args, **kwargs):
-, *args, **kwargs)
-class ThreadGroup(object):
- """The point of the ThreadGroup class is to:
- * keep track of timers and greenthreads (making it easier to stop them
- when need be).
- * provide an easy API to add timers.
- """
- def __init__(self, thread_pool_size=10):
- self.pool = greenpool.GreenPool(thread_pool_size)
- self.threads = []
- self.timers = []
- def add_dynamic_timer(self, callback, initial_delay=None,
- periodic_interval_max=None, *args, **kwargs):
- timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
- timer.start(initial_delay=initial_delay,
- periodic_interval_max=periodic_interval_max)
- self.timers.append(timer)
- def add_timer(self, interval, callback, initial_delay=None,
- *args, **kwargs):
- pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
- pulse.start(interval=interval,
- initial_delay=initial_delay)
- self.timers.append(pulse)
- def add_thread(self, callback, *args, **kwargs):
- gt = self.pool.spawn(callback, *args, **kwargs)
- th = Thread(gt, self)
- self.threads.append(th)
- return th
- def thread_done(self, thread):
- self.threads.remove(thread)
- def _stop_threads(self):
- current = threading.current_thread()
- # Iterate over a copy of self.threads so thread_done doesn't
- # modify the list while we're iterating
- for x in self.threads[:]:
- if x is current:
- # don't kill the current thread.
- continue
- try:
- x.stop()
- except eventlet.greenlet.GreenletExit:
- pass
- except Exception:
- LOG.exception(_LE('Error stopping thread.'))
- def stop_timers(self):
- for x in self.timers:
- try:
- x.stop()
- except Exception:
- LOG.exception(_LE('Error stopping timer.'))
- self.timers = []
- def stop(self, graceful=False):
- """stop function has the option of graceful=True/False.
- * In case of graceful=True, wait for all threads to be finished.
- Never kill threads.
- * In case of graceful=False, kill threads immediately.
- """
- self.stop_timers()
- if graceful:
- # In case of graceful=True, wait for all threads to be
- # finished, never kill threads
- self.wait()
- else:
- # In case of graceful=False(Default), kill threads
- # immediately
- self._stop_threads()
- def wait(self):
- for x in self.timers:
- try:
- x.wait()
- except eventlet.greenlet.GreenletExit:
- pass
- except Exception:
- LOG.exception(_LE('Error waiting on ThreadGroup.'))
- current = threading.current_thread()
- # Iterate over a copy of self.threads so thread_done doesn't
- # modify the list while we're iterating
- for x in self.threads[:]:
- if x is current:
- continue
- try:
- x.wait()
- except eventlet.greenlet.GreenletExit:
- pass
- except Exception as ex:
- LOG.exception(ex)
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import topics
from neutron import context
from neutron.i18n import _LE
-from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
import six
from neutron.agent.common import ovs_lib
from neutron.common import utils as n_utils
from neutron.i18n import _LE, _LI
from neutron import context
-from neutron.openstack.common import loopingcall
from import constants
# under the License.
from oslo_log import log
+from oslo_service import loopingcall
from neutron.common import constants as n_constants
from neutron import context
from neutron.i18n import _LW
from neutron import manager
-from neutron.openstack.common import loopingcall
from neutron.plugins.ml2 import db as l2_db
from neutron.plugins.ml2 import driver_context
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import periodic_task
+from oslo_service import service as svc
from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.db import agents_db
from neutron.i18n import _LE, _LI
from neutron import manager
-from neutron.openstack.common import periodic_task
-from neutron.openstack.common import service as svc
from import mechanism_apic as ma
from neutron.plugins.ml2.drivers import type_vlan # noqa
server = service.Service.create(
binary=binary, manager=manager, topic=topic,
report_interval=report_period, periodic_interval=poll_period)
- svc.launch(server).wait()
+ svc.launch(cfg.CONF, server).wait()
def service_main():
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
+from oslo_service import service
from six import moves
from neutron.agent.linux import ip_lib
from neutron.common import utils as q_utils
from neutron import context
from neutron.i18n import _LE, _LI, _LW
-from neutron.openstack.common import loopingcall
-from neutron.openstack.common import service
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.l2pop.rpc_manager \
import l2population_rpc as l2pop_rpc
quitting_rpc_timeout)"Agent initialized successfully, now running... "))
- launcher = service.launch(agent)
+ launcher = service.launch(cfg.CONF, agent)
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import utils as q_utils
from neutron import context
from neutron.i18n import _LE, _LI
-from neutron.openstack.common import loopingcall
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config # noqa
from neutron.plugins.ml2.drivers.mech_sriov.agent.common \
import exceptions as exc
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
import six
from six import moves
from neutron.common import utils as q_utils
from neutron import context
from neutron.i18n import _LE, _LI, _LW
-from neutron.openstack.common import loopingcall
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
from oslo_config import cfg
from oslo_log import log as logging
from oslo_messaging import server as rpc_server
+from oslo_service import loopingcall
+from oslo_service import service as common_service
from oslo_utils import excutils
from oslo_utils import importutils
from neutron.db import api as session
from neutron.i18n import _LE, _LI
from neutron import manager
-from neutron.openstack.common import loopingcall
-from neutron.openstack.common import service as common_service
from neutron import wsgi
return service
-class RpcWorker(object):
+class RpcWorker(common_service.ServiceBase):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, plugin):
self._plugin = plugin
# be shared DB connections in child processes which may cause
# DB errors.
- launcher = common_service.ProcessLauncher(wait_interval=1.0)
+ launcher = common_service.ProcessLauncher(cfg.CONF,
+ wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
return launcher
except Exception:
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+from oslo_service import loopingcall
+from oslo_service import periodic_task
+from oslo_service import service
from oslo_utils import importutils
from neutron.agent.common import config
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
-from neutron.openstack.common import loopingcall
-from neutron.openstack.common import periodic_task
-from neutron.openstack.common import service
from neutron import service as neutron_service
- service.launch(server).wait()
+ service.launch(cfg.CONF, server).wait()
def test_dhcp_agent_main_agent_manager(self):
logging_str = 'neutron.agent.common.config.setup_logging'
- launcher_str = 'neutron.openstack.common.service.ServiceLauncher'
+ launcher_str = 'oslo_service.service.ServiceLauncher'
with mock.patch(logging_str):
with mock.patch.object(sys, 'argv') as sys_argv:
with mock.patch(launcher_str) as launcher:
- [,,
+ [,
def test_run_completes_single_pass(self):
l3pluginApi_cls.return_value = self.plugin_api
self.looping_call_p = mock.patch(
- 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall')
+ 'oslo_service.loopingcall.FixedIntervalLoopingCall')
subnet_id_1 = _uuid()
l3pluginApi_cls.return_value = self.plugin_api
self.looping_call_p = mock.patch(
- 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall')
+ 'oslo_service.loopingcall.FixedIntervalLoopingCall')
subnet_id_1 = _uuid()
self.cfg_p = mock.patch.object(agent, 'cfg')
self.cfg = self.cfg_p.start()
looping_call_p = mock.patch(
- 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall')
+ 'oslo_service.loopingcall.FixedIntervalLoopingCall')
self.looping_mock = looping_call_p.start()
self.cfg.CONF.metadata_proxy_socket = '/the/path'
self.cfg.CONF.metadata_workers = 0
with mock.patch(''
- mock.patch('neutron.openstack.common.loopingcall.'
+ mock.patch('oslo_service.loopingcall.'
self.agent = sdnve_neutron_agent.SdnveNeutronAgent(**kwargs)
from import apic_sync
from neutron.tests import base
-LOOPING_CALL = 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall'
+LOOPING_CALL = 'oslo_service.loopingcall.FixedIntervalLoopingCall'
GET_PLUGIN = 'neutron.manager.NeutronManager.get_plugin'
GET_ADMIN_CONTEXT = 'neutron.context.get_admin_context'
L2_DB = 'neutron.plugins.ml2.db.get_locked_port_and_binding'
RPC_CONNECTION = 'neutron.common.rpc.Connection'
AGENTS_DB = 'neutron.db.agents_db'
-PERIODIC_TASK = 'neutron.openstack.common.periodic_task'
+PERIODIC_TASK = 'oslo_service.periodic_task'
DEV_EXISTS = 'neutron.agent.linux.ip_lib.device_exists'
IP_DEVICE = 'neutron.agent.linux.ip_lib.IPDevice'
EXECUTE = 'neutron.agent.linux.utils.execute'
def start(self, interval=0):
- mock.patch('neutron.openstack.common.loopingcall.'
+ mock.patch('oslo_service.loopingcall.'
- mock.patch('neutron.openstack.common.loopingcall.'
- 'FixedIntervalLoopingCall',
+ mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
'neutron.agent.common.ovs_lib.OVSBridge.' 'get_vif_ports',
- mock.patch('neutron.openstack.common.loopingcall.'
+ mock.patch('oslo_service.loopingcall.'
- with mock.patch('neutron.openstack.common.loopingcall.'
+ with mock.patch('oslo_service.loopingcall.'
'FixedIntervalLoopingCall') as loopingcall:
kwargs = {'integ_br': 'integration_bridge',
'polling_interval': 5}
loopingcall_patch = mock.patch(
- 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall')
+ 'oslo_service.loopingcall.FixedIntervalLoopingCall')
self.agent = metering_agent.MeteringAgent('my agent', cfg.CONF)
def test_init_chain(self):
- with mock.patch('neutron.openstack.common.'
+ with mock.patch('oslo_service.'
'periodic_task.PeriodicTasks.__init__') as init:
metering_agent.MeteringAgent('my agent', cfg.CONF)
- init.assert_called_once_with()
+ init.assert_called_once_with(cfg.CONF)
- @mock.patch('neutron.openstack.common.service.ProcessLauncher')
+ @mock.patch('oslo_service.service.ProcessLauncher')
def test_start_multiple_workers(self, ProcessLauncher):
launcher = ProcessLauncher.return_value
from oslo_log import log as logging
from oslo_log import loggers
from oslo_serialization import jsonutils
+from oslo_service import service as common_service
+from oslo_service import systemd
from oslo_utils import excutils
import routes.middleware
import six
from neutron import context
from neutron.db import api
from neutron.i18n import _LE, _LI
-from neutron.openstack.common import service as common_service
-from neutron.openstack.common import systemd
socket_opts = [
LOG = logging.getLogger(__name__)
-class WorkerService(object):
+class WorkerService(common_service.ServiceBase):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application):
self._service = service
# The API service runs in a number of child processes.
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
- self._server = common_service.ProcessLauncher(wait_interval=1.0)
+ self._server = common_service.ProcessLauncher(cfg.CONF,
+ wait_interval=1.0)
self._server.launch_service(service, workers=workers)
# The list of modules to copy from oslo-incubator.git
# The following module is not synchronized by script since it's
# located in tools/ not neutron/openstack/common/. Left here to make it
# explicit that we still ship code from incubator here
# The base module to hold the copy of openstack.common
oslo.policy>=0.5.0 # Apache-2.0
oslo.rootwrap>=2.0.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
+oslo.service>=0.1.0 # Apache-2.0
oslo.utils>=1.6.0 # Apache-2.0