This is required by impl_kombu.py. The engine failed to start without it.
import logging
-class ProcessExecutionError(IOError):
- def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
- description=None):
- if description is None:
- description = "Unexpected error while running command."
- if exit_code is None:
- exit_code = '-'
- message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
- description, cmd, exit_code, stdout, stderr)
- IOError.__init__(self, message)
-
-
class Error(Exception):
def __init__(self, message=None):
super(Error, self).__init__(message)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Network-related utilities and helper functions.
+"""
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+def parse_host_port(address, default_port=None):
+ """
+ Interpret a string as a host:port pair.
+ An IPv6 address MUST be escaped if accompanied by a port,
+ because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
+ means both [2001:db8:85a3::8a2e:370:7334] and
+ [2001:db8:85a3::8a2e:370]:7334.
+
+ >>> parse_host_port('server01:80')
+ ('server01', 80)
+ >>> parse_host_port('server01')
+ ('server01', None)
+ >>> parse_host_port('server01', default_port=1234)
+ ('server01', 1234)
+ >>> parse_host_port('[::1]:80')
+ ('::1', 80)
+ >>> parse_host_port('[::1]')
+ ('::1', None)
+ >>> parse_host_port('[::1]', default_port=1234)
+ ('::1', 1234)
+ >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
+ ('2001:db8:85a3::8a2e:370:7334', 1234)
+
+ """
+ if address[0] == '[':
+ # Escaped ipv6
+ _host, _port = address[1:].split(']')
+ host = _host
+ if ':' in _port:
+ port = _port.split(':')[1]
+ else:
+ port = default_port
+ else:
+ if address.count(':') == 1:
+ host, port = address.split(':')
+ else:
+ # 0 means ipv4, >1 means ipv6.
+ # We prohibit unescaped ipv6 addresses with port.
+ host = address
+ port = default_port
+
+ return (host, None if port is None else int(port))
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.cast_to_server(
+ return rpc_amqp.fanout_cast_to_server(
conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
timeout = timeout or CONF.rpc_response_timeout
# The msg_id is used to track replies.
- msg_id = str(uuid.uuid4().hex)
+ msg_id = uuid.uuid4().hex
# Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
if subject.strip().lower() in ('true', 'on', 'yes', '1'):
return True
return False
-
-
-def execute(*cmd, **kwargs):
- """
- Helper method to execute command with optional retry.
-
- :cmd Passed to subprocess.Popen.
- :process_input Send to opened process.
- :check_exit_code Defaults to 0. Raise exception.ProcessExecutionError
- unless program exits with this code.
- :delay_on_retry True | False. Defaults to True. If set to True, wait a
- short amount of time before retrying.
- :attempts How many times to retry cmd.
- :run_as_root True | False. Defaults to False. If set to True,
- the command is prefixed by the command specified
- in the root_helper kwarg.
- :root_helper command to prefix all cmd's with
-
- :raises exception.Error on receiving unknown arguments
- :raises exception.ProcessExecutionError
- """
-
- process_input = kwargs.pop('process_input', None)
- check_exit_code = kwargs.pop('check_exit_code', 0)
- delay_on_retry = kwargs.pop('delay_on_retry', True)
- attempts = kwargs.pop('attempts', 1)
- run_as_root = kwargs.pop('run_as_root', False)
- root_helper = kwargs.pop('root_helper', '')
- if len(kwargs):
- raise exception.Error(_('Got unknown keyword args '
- 'to utils.execute: %r') % kwargs)
- if run_as_root:
- cmd = shlex.split(root_helper) + list(cmd)
- cmd = map(str, cmd)
-
- while attempts > 0:
- attempts -= 1
- try:
- LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
- _PIPE = subprocess.PIPE # pylint: disable=E1101
- obj = subprocess.Popen(cmd,
- stdin=_PIPE,
- stdout=_PIPE,
- stderr=_PIPE,
- close_fds=True)
- result = None
- if process_input is not None:
- result = obj.communicate(process_input)
- else:
- result = obj.communicate()
- obj.stdin.close() # pylint: disable=E1101
- _returncode = obj.returncode # pylint: disable=E1101
- if _returncode:
- LOG.debug(_('Result was %s') % _returncode)
- if (isinstance(check_exit_code, int) and
- not isinstance(check_exit_code, bool) and
- _returncode != check_exit_code):
- (stdout, stderr) = result
- raise exception.ProcessExecutionError(
- exit_code=_returncode,
- stdout=stdout,
- stderr=stderr,
- cmd=' '.join(cmd))
- return result
- except exception.ProcessExecutionError:
- if not attempts:
- raise
- else:
- LOG.debug(_('%r failed. Retrying.'), cmd)
- if delay_on_retry:
- greenthread.sleep(random.randint(20, 200) / 100.0)
- finally:
- # NOTE(termie): this appears to be necessary to let the subprocess
- # call clean something up in between calls, without
- # it two execute calls in a row hangs the second one
- greenthread.sleep(0)
[DEFAULT]
# The list of modules to copy from openstack-common
-modules=gettextutils,cfg,local,iniparser,utils,exception,timeutils,importutils,setup,log,jsonutils,notifier,rpc,excutils,service,threadgroup,eventlet_backdoor,loopingcall
+modules=gettextutils,cfg,local,iniparser,utils,exception,timeutils,importutils,setup,log,jsonutils,notifier,rpc,excutils,service,threadgroup,eventlet_backdoor,loopingcall,network_utils
# The base module to hold the copy of openstack.common
base=heat