]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Sync service and systemd modules from oslo-incubator
authorJakub Libosvar <libosvar@redhat.com>
Mon, 17 Mar 2014 15:36:01 +0000 (16:36 +0100)
committerJakub Libosvar <libosvar@redhat.com>
Fri, 4 Apr 2014 10:16:50 +0000 (12:16 +0200)
This patch make systemd know when neutron-service was started. This is
needed in HA environment, previously systemd returned success even
before neutron-server was able to handle requests.

Current oslo-incubator commit on HEAD:
b7ad6ddab8b1d61bf4f52ccaa461a9d68809747b

Implements: blueprint service-readiness
Change-Id: Ic9e4abd11b614a896fbd7454b9a604a69a248d0f

neutron/openstack/common/service.py
neutron/openstack/common/systemd.py [new file with mode: 0644]
openstack-common.conf

index b8144bb3a73495966df82869c688e3b3f9a9410c..627dda4ffddac822aa59c390bca21fe1556f5d6e 100644 (file)
@@ -38,9 +38,10 @@ from eventlet import event
 from oslo.config import cfg
 
 from neutron.openstack.common import eventlet_backdoor
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _LE, _LI, _LW
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
+from neutron.openstack.common import systemd
 from neutron.openstack.common import threadgroup
 
 
@@ -163,7 +164,7 @@ class ServiceLauncher(Launcher):
         status = None
         signo = 0
 
-        LOG.debug(_('Full set of CONF:'))
+        LOG.debug('Full set of CONF:')
         CONF.log_opt_values(LOG, std_logging.DEBUG)
 
         try:
@@ -172,7 +173,7 @@ class ServiceLauncher(Launcher):
             super(ServiceLauncher, self).wait()
         except SignalExit as exc:
             signame = _signo_to_signame(exc.signo)
-            LOG.info(_('Caught %s, exiting'), signame)
+            LOG.info(_LI('Caught %s, exiting'), signame)
             status = exc.code
             signo = exc.signo
         except SystemExit as exc:
@@ -184,7 +185,7 @@ class ServiceLauncher(Launcher):
                     rpc.cleanup()
                 except Exception:
                     # We're shutting down, so it doesn't matter at this point.
-                    LOG.exception(_('Exception during rpc cleanup.'))
+                    LOG.exception(_LE('Exception during rpc cleanup.'))
 
         return status, signo
 
@@ -235,7 +236,7 @@ class ProcessLauncher(object):
         # dies unexpectedly
         self.readpipe.read()
 
-        LOG.info(_('Parent process has died unexpectedly, exiting'))
+        LOG.info(_LI('Parent process has died unexpectedly, exiting'))
 
         sys.exit(1)
 
@@ -266,13 +267,13 @@ class ProcessLauncher(object):
             launcher.wait()
         except SignalExit as exc:
             signame = _signo_to_signame(exc.signo)
-            LOG.info(_('Caught %s, exiting'), signame)
+            LOG.info(_LI('Caught %s, exiting'), signame)
             status = exc.code
             signo = exc.signo
         except SystemExit as exc:
             status = exc.code
         except BaseException:
-            LOG.exception(_('Unhandled exception'))
+            LOG.exception(_LE('Unhandled exception'))
             status = 2
         finally:
             launcher.stop()
@@ -305,7 +306,7 @@ class ProcessLauncher(object):
             # start up quickly but ensure we don't fork off children that
             # die instantly too quickly.
             if time.time() - wrap.forktimes[0] < wrap.workers:
-                LOG.info(_('Forking too fast, sleeping'))
+                LOG.info(_LI('Forking too fast, sleeping'))
                 time.sleep(1)
 
             wrap.forktimes.pop(0)
@@ -324,7 +325,7 @@ class ProcessLauncher(object):
 
             os._exit(status)
 
-        LOG.info(_('Started child %d'), pid)
+        LOG.info(_LI('Started child %d'), pid)
 
         wrap.children.add(pid)
         self.children[pid] = wrap
@@ -334,7 +335,7 @@ class ProcessLauncher(object):
     def launch_service(self, service, workers=1):
         wrap = ServiceWrapper(service, workers)
 
-        LOG.info(_('Starting %d workers'), wrap.workers)
+        LOG.info(_LI('Starting %d workers'), wrap.workers)
         while self.running and len(wrap.children) < wrap.workers:
             self._start_child(wrap)
 
@@ -351,15 +352,15 @@ class ProcessLauncher(object):
 
         if os.WIFSIGNALED(status):
             sig = os.WTERMSIG(status)
-            LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
+            LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
                      dict(pid=pid, sig=sig))
         else:
             code = os.WEXITSTATUS(status)
-            LOG.info(_('Child %(pid)s exited with status %(code)d'),
+            LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
                      dict(pid=pid, code=code))
 
         if pid not in self.children:
-            LOG.warning(_('pid %d not in child list'), pid)
+            LOG.warning(_LW('pid %d not in child list'), pid)
             return None
 
         wrap = self.children.pop(pid)
@@ -381,22 +382,25 @@ class ProcessLauncher(object):
     def wait(self):
         """Loop waiting on children to die and respawning as necessary."""
 
-        LOG.debug(_('Full set of CONF:'))
+        LOG.debug('Full set of CONF:')
         CONF.log_opt_values(LOG, std_logging.DEBUG)
 
-        while True:
-            self.handle_signal()
-            self._respawn_children()
-            if self.sigcaught:
-                signame = _signo_to_signame(self.sigcaught)
-                LOG.info(_('Caught %s, stopping children'), signame)
-            if not _is_sighup_and_daemon(self.sigcaught):
-                break
-
-            for pid in self.children:
-                os.kill(pid, signal.SIGHUP)
-            self.running = True
-            self.sigcaught = None
+        try:
+            while True:
+                self.handle_signal()
+                self._respawn_children()
+                if self.sigcaught:
+                    signame = _signo_to_signame(self.sigcaught)
+                    LOG.info(_LI('Caught %s, stopping children'), signame)
+                if not _is_sighup_and_daemon(self.sigcaught):
+                    break
+
+                for pid in self.children:
+                    os.kill(pid, signal.SIGHUP)
+                self.running = True
+                self.sigcaught = None
+        except eventlet.greenlet.GreenletExit:
+            LOG.info(_LI("Wait called after thread killed.  Cleaning up."))
 
         for pid in self.children:
             try:
@@ -407,7 +411,7 @@ class ProcessLauncher(object):
 
         # Wait for children to die
         if self.children:
-            LOG.info(_('Waiting on %d children to exit'), len(self.children))
+            LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
             while self.children:
                 self._wait_child()
 
@@ -484,14 +488,16 @@ class Services(object):
 
         """
         service.start()
+        systemd.notify_once()
         done.wait()
 
 
-def launch(service, workers=None):
-    if workers:
-        launcher = ProcessLauncher()
-        launcher.launch_service(service, workers=workers)
-    else:
+def launch(service, workers=1):
+    if workers is None or workers == 1:
         launcher = ServiceLauncher()
         launcher.launch_service(service)
+    else:
+        launcher = ProcessLauncher()
+        launcher.launch_service(service, workers=workers)
+
     return launcher
diff --git a/neutron/openstack/common/systemd.py b/neutron/openstack/common/systemd.py
new file mode 100644 (file)
index 0000000..e1ba656
--- /dev/null
@@ -0,0 +1,104 @@
+# Copyright 2012-2014 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Helper module for systemd service readiness notification.
+"""
+
+import os
+import socket
+import sys
+
+from neutron.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+def _abstractify(socket_name):
+    if socket_name.startswith('@'):
+        # abstract namespace socket
+        socket_name = '\0%s' % socket_name[1:]
+    return socket_name
+
+
+def _sd_notify(unset_env, msg):
+    notify_socket = os.getenv('NOTIFY_SOCKET')
+    if notify_socket:
+        sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+        try:
+            sock.connect(_abstractify(notify_socket))
+            sock.sendall(msg)
+            if unset_env:
+                del os.environ['NOTIFY_SOCKET']
+        except EnvironmentError:
+            LOG.debug("Systemd notification failed", exc_info=True)
+        finally:
+            sock.close()
+
+
+def notify():
+    """Send notification to Systemd that service is ready.
+    For details see
+      http://www.freedesktop.org/software/systemd/man/sd_notify.html
+    """
+    _sd_notify(False, 'READY=1')
+
+
+def notify_once():
+    """Send notification once to Systemd that service is ready.
+    Systemd sets NOTIFY_SOCKET environment variable with the name of the
+    socket listening for notifications from services.
+    This method removes the NOTIFY_SOCKET environment variable to ensure
+    notification is sent only once.
+    """
+    _sd_notify(True, 'READY=1')
+
+
+def onready(notify_socket, timeout):
+    """Wait for systemd style notification on the socket.
+
+    :param notify_socket: local socket address
+    :type notify_socket:  string
+    :param timeout:       socket timeout
+    :type timeout:        float
+    :returns:             0 service ready
+                          1 service not ready
+                          2 timeout occured
+    """
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    sock.settimeout(timeout)
+    sock.bind(_abstractify(notify_socket))
+    try:
+        msg = sock.recv(512)
+    except socket.timeout:
+        return 2
+    finally:
+        sock.close()
+    if 'READY=1' in msg:
+        return 0
+    else:
+        return 1
+
+
+if __name__ == '__main__':
+    # simple CLI for testing
+    if len(sys.argv) == 1:
+        notify()
+    elif len(sys.argv) >= 2:
+        timeout = float(sys.argv[1])
+        notify_socket = os.getenv('NOTIFY_SOCKET')
+        if notify_socket:
+            retval = onready(notify_socket, timeout)
+            sys.exit(retval)
index 9523f9c08048fcb30586ea2d0fb141a971ca0434..395576fb921b1c76280fe8c6cdb730c42cdd4a3e 100644 (file)
@@ -26,6 +26,7 @@ module=processutils
 module=rpc
 module=service
 module=sslutils
+module=systemd
 module=threadgroup
 module=timeutils
 module=uuidutils