gettext.install('heat', unicode=1)
+from heat import rpc
from heat.common import config
from heat.common import wsgi
from paste import httpserver
try:
conf = config.HeatConfigOpts()
conf()
+ config.FLAGS = conf
+ rpc.configure(conf)
app = config.load_paste_app(conf)
gettext.install('heat', unicode=1)
+from heat import rpc
from heat import service
from heat.common import config
from heat.common import utils
if __name__ == '__main__':
- config.FLAGS(sys.argv)
- config.setup_logging(config.FLAGS)
- db_api.configure(config.FLAGS)
+ conf = config.HeatEngineConfigOpts()
+ conf()
+ config.FLAGS = conf
+
+ config.setup_logging(conf)
+ rpc.configure(conf)
+ db_api.configure(conf)
#utils.monkey_patch()
server = service.Service.create(binary='heat-engine',
topic='engine',
- manager='heat.engine.manager.EngineManager')
+ manager='heat.engine.manager.EngineManager',
+ config=conf)
service.serve(server)
service.wait()
cfg.StrOpt('config_file'),
]
-
-class HeatConfigOpts(cfg.CommonConfigOpts):
-
- def __init__(self, default_config_files=None, **kwargs):
- super(HeatConfigOpts, self).__init__(
- project='heat',
- version='%%prog %s' % version.version_string(),
- default_config_files=default_config_files,
- **kwargs)
-
-class HeatEngineConfigOpts(cfg.CommonConfigOpts):
- db_opts = [
- cfg.StrOpt('db_backend', default='heat.db.anydbm.api', help='The backend to use for db'),
- cfg.StrOpt('sql_connection',
- default='mysql://heat:heat@localhost/heat',
- help='The SQLAlchemy connection string used to connect to the '
- 'database'),
- cfg.IntOpt('sql_idle_timeout',
- default=3600,
- help='timeout before idle sql connections are reaped'),
- ]
- engine_opts = [
- cfg.StrOpt('host',
- default=socket.gethostname(),
- help='Name of this node. This can be an opaque identifier. '
- 'It is not necessarily a hostname, FQDN, or IP address.'),
- cfg.StrOpt('instance_driver',
- default='heat.engine.nova',
- help='Driver to use for controlling instances'),
+FLAGS = None
+
+rpc_opts = [
+ cfg.StrOpt('rpc_backend',
+ default='heat.rpc.impl_qpid',
+ help="The messaging module to use, defaults to kombu."),
+ cfg.IntOpt('rpc_thread_pool_size',
+ default=1024,
+ help='Size of RPC thread pool'),
+ cfg.IntOpt('rpc_conn_pool_size',
+ default=30,
+ help='Size of RPC connection pool'),
+ cfg.IntOpt('rpc_response_timeout',
+ default=60,
+ help='Seconds to wait for a response from call or multicall'),
+ cfg.StrOpt('qpid_hostname',
+ default='localhost',
+ help='Qpid broker hostname'),
+ cfg.StrOpt('qpid_port',
+ default='5672',
+ help='Qpid broker port'),
+ cfg.StrOpt('qpid_username',
+ default='',
+ help='Username for qpid connection'),
+ cfg.StrOpt('qpid_password',
+ default='',
+ help='Password for qpid connection'),
+ cfg.StrOpt('qpid_sasl_mechanisms',
+ default='',
+ help='Space separated list of SASL mechanisms to use for auth'),
+ cfg.BoolOpt('qpid_reconnect',
+ default=True,
+ help='Automatically reconnect'),
+ cfg.IntOpt('qpid_reconnect_timeout',
+ default=0,
+ help='Reconnection timeout in seconds'),
+ cfg.IntOpt('qpid_reconnect_limit',
+ default=0,
+ help='Max reconnections before giving up'),
+ cfg.IntOpt('qpid_reconnect_interval_min',
+ default=0,
+ help='Minimum seconds between reconnection attempts'),
+ cfg.IntOpt('qpid_reconnect_interval_max',
+ default=0,
+ help='Maximum seconds between reconnection attempts'),
+ cfg.IntOpt('qpid_reconnect_interval',
+ default=0,
+ help='Equivalent to setting max and min to the same value'),
+ cfg.IntOpt('qpid_heartbeat',
+ default=5,
+ help='Seconds between connection keepalive heartbeats'),
+ cfg.StrOpt('qpid_protocol',
+ default='tcp',
+ help="Transport to use, either 'tcp' or 'ssl'"),
+ cfg.BoolOpt('qpid_tcp_nodelay',
+ default=True,
+ help='Disable Nagle algorithm'),
cfg.StrOpt('rabbit_host',
default='localhost',
help='the RabbitMQ host'),
]
+
+class HeatConfigOpts(cfg.CommonConfigOpts):
+ def __init__(self, default_config_files=None, **kwargs):
+ super(HeatConfigOpts, self).__init__(
+ project='heat',
+ version='%%prog %s' % version.version_string(),
+ default_config_files=default_config_files,
+ **kwargs)
+ self.register_cli_opts(rpc_opts)
+
+class HeatEngineConfigOpts(cfg.CommonConfigOpts):
+
+ service_opts = [
+ cfg.IntOpt('report_interval',
+ default=10,
+ help='seconds between nodes reporting state to datastore'),
+ cfg.IntOpt('periodic_interval',
+ default=60,
+ help='seconds between running periodic tasks'),
+ cfg.StrOpt('ec2_listen',
+ default="0.0.0.0",
+ help='IP address for EC2 API to listen'),
+ cfg.IntOpt('ec2_listen_port',
+ default=8773,
+ help='port for ec2 api to listen'),
+ cfg.StrOpt('osapi_compute_listen',
+ default="0.0.0.0",
+ help='IP address for OpenStack API to listen'),
+ cfg.IntOpt('osapi_compute_listen_port',
+ default=8774,
+ help='list port for osapi compute'),
+ cfg.StrOpt('metadata_manager',
+ default='nova.api.manager.MetadataManager',
+ help='OpenStack metadata service manager'),
+ cfg.StrOpt('metadata_listen',
+ default="0.0.0.0",
+ help='IP address for metadata api to listen'),
+ cfg.IntOpt('metadata_listen_port',
+ default=8775,
+ help='port for metadata api to listen'),
+ cfg.StrOpt('osapi_volume_listen',
+ default="0.0.0.0",
+ help='IP address for OpenStack Volume API to listen'),
+ cfg.IntOpt('osapi_volume_listen_port',
+ default=8776,
+ help='port for os volume api to listen'),
+ ]
+ db_opts = [
+ cfg.StrOpt('db_backend', default='heat.db.anydbm.api', help='The backend to use for db'),
+ cfg.StrOpt('sql_connection',
+ default='mysql://heat:heat@localhost/heat',
+ help='The SQLAlchemy connection string used to connect to the '
+ 'database'),
+ cfg.IntOpt('sql_idle_timeout',
+ default=3600,
+ help='timeout before idle sql connections are reaped'),
+ ]
+ engine_opts = [
+ cfg.StrOpt('host',
+ default=socket.gethostname(),
+ help='Name of this node. This can be an opaque identifier. '
+ 'It is not necessarily a hostname, FQDN, or IP address.'),
+ cfg.StrOpt('instance_driver',
+ default='heat.engine.nova',
+ help='Driver to use for controlling instances'),
+ ]
+
def __init__(self, default_config_files=None, **kwargs):
super(HeatEngineConfigOpts, self).__init__(
project='heat',
prog='heat-engine')
self.register_cli_opts(self.engine_opts)
self.register_cli_opts(self.db_opts)
-
-FLAGS = HeatEngineConfigOpts()
-
+ self.register_cli_opts(self.service_opts)
+ self.register_cli_opts(rpc_opts)
def setup_logging(conf):
"""
# under the License.
from heat.common import exception
-from heat.common import utils
from heat.common import wsgi
from heat.openstack.common import cfg
+from heat.openstack.common import utils
class RequestContext(object):
import functools
import urlparse
-
+from heat.openstack.common.exception import OpenstackException
class RedirectException(Exception):
def __init__(self, url):
self.url = urlparse.urlparse(url)
-
-class HeatException(Exception):
- """
- Base Heat Exception
-
- To correctly use this class, inherit from it and define
- a 'message' property. That message will get printf'd
- with the keyword arguments provided to the constructor.
- """
- message = _("An unknown exception occurred")
-
- def __init__(self, *args, **kwargs):
- try:
- self._error_string = self.message % kwargs
- except Exception:
- # at least get the core message out if something happened
- self._error_string = self.message
- if len(args) > 0:
- # If there is a non-kwarg parameter, assume it's the error
- # message or reason description and tack it on to the end
- # of the exception message
- # Convert all arguments into their string representations...
- args = ["%s" % arg for arg in args]
- self._error_string = (self._error_string +
- "\nDetails: %s" % '\n'.join(args))
-
- def __str__(self):
- return self._error_string
-
def wrap_exception(notifier=None, publisher_id=None, event_type=None,
level=None):
"""This decorator wraps a method to catch any exceptions that may
return functools.wraps(f)(wrapped)
return inner
-
-class NovaException(Exception):
- """Base Nova Exception
-
- To correctly use this class, inherit from it and define
- a 'message' property. That message will get printf'd
- with the keyword arguments provided to the constructor.
-
- """
- message = _("An unknown exception occurred.")
-
- def __init__(self, message=None, **kwargs):
- self.kwargs = kwargs
-
- if 'code' not in self.kwargs:
- try:
- self.kwargs['code'] = self.code
- except AttributeError:
- pass
-
- if not message:
- try:
- message = self.message % kwargs
-
- except Exception as e:
- # at least get the core message out if something happened
- message = self.message
-
- super(NovaException, self).__init__(message)
-
-class MissingArgumentError(HeatException):
- message = _("Missing required argument.")
-
-
-class MissingCredentialError(HeatException):
+class MissingCredentialError(OpenstackException):
message = _("Missing required credential: %(required)s")
-class BadAuthStrategy(HeatException):
+class BadAuthStrategy(OpenstackException):
message = _("Incorrect auth strategy, expected \"%(expected)s\" but "
"received \"%(received)s\"")
-
-class NotFound(HeatException):
- message = _("An object with the specified identifier was not found.")
-
-
-class UnknownScheme(HeatException):
- message = _("Unknown scheme '%(scheme)s' found in URI")
-
-
-class BadStoreUri(HeatException):
- message = _("The Store URI %(uri)s was malformed. Reason: %(reason)s")
-
-
-class Duplicate(HeatException):
- message = _("An object with the same identifier already exists.")
-
-
-class StorageFull(HeatException):
- message = _("There is not enough disk space on the image storage media.")
-
-
-class StorageWriteDenied(HeatException):
- message = _("Permission to write image storage media denied.")
-
-
-class ImportFailure(HeatException):
- message = _("Failed to import requested object/class: '%(import_str)s'. "
- "Reason: %(reason)s")
-
-
-class AuthBadRequest(HeatException):
+class AuthBadRequest(OpenstackException):
message = _("Connect error/bad request to Auth service at URL %(url)s.")
-class AuthUrlNotFound(HeatException):
+class AuthUrlNotFound(OpenstackException):
message = _("Auth service at URL %(url)s not found.")
-class AuthorizationFailure(HeatException):
+class AuthorizationFailure(OpenstackException):
message = _("Authorization failed.")
-class NotAuthenticated(HeatException):
+class NotAuthenticated(OpenstackException):
message = _("You are not authenticated.")
-class Forbidden(HeatException):
+class Forbidden(OpenstackException):
message = _("You are not authorized to complete this action.")
#NOTE(bcwaldon): here for backwards-compatability, need to deprecate.
message = _("You are not authorized to complete this action.")
-class Invalid(HeatException):
+class Invalid(OpenstackException):
message = _("Data supplied was not valid.")
-class AuthorizationRedirect(HeatException):
+class AuthorizationRedirect(OpenstackException):
message = _("Redirecting to %(uri)s for authorization.")
-class DatabaseMigrationError(HeatException):
- message = _("There was an error migrating the database.")
-
-
-class ClientConnectionError(HeatException):
- message = _("There was an error connecting to a server")
-
-
-class ClientConfigurationError(HeatException):
+class ClientConfigurationError(OpenstackException):
message = _("There was an error configuring the client.")
-class MultipleChoices(HeatException):
+class MultipleChoices(OpenstackException):
message = _("The request returned a 302 Multiple Choices. This generally "
"means that you have not included a version indicator in a "
"request URI.\n\nThe body of response returned:\n%(body)s")
-class LimitExceeded(HeatException):
+class LimitExceeded(OpenstackException):
message = _("The request returned a 413 Request Entity Too Large. This "
"generally means that rate limiting or a quota threshold was "
"breached.\n\nThe response body:\n%(body)s")
super(LimitExceeded, self).__init__(*args, **kwargs)
-class ServiceUnavailable(HeatException):
+class ServiceUnavailable(OpenstackException):
message = _("The request returned a 503 ServiceUnavilable. This "
"generally occurs on service overload or other transient "
"outage.")
else None)
super(ServiceUnavailable, self).__init__(*args, **kwargs)
-class RequestUriTooLong(HeatException):
+class RequestUriTooLong(OpenstackException):
message = _("The URI was too long.")
-class ServerError(HeatException):
+class ServerError(OpenstackException):
message = _("The request returned 500 Internal Server Error"
"\n\nThe response body:\n%(body)s")
-
-class UnexpectedStatus(HeatException):
- message = _("The request returned an unexpected status: %(status)s."
- "\n\nThe response body:\n%(body)s")
-
-
-class InvalidContentType(HeatException):
- message = _("Invalid content type %(content_type)s")
-
-
-class BadRegistryConnectionConfiguration(HeatException):
- message = _("Registry was not configured correctly on API server. "
- "Reason: %(reason)s")
-
-
-class BadStoreConfiguration(HeatException):
- message = _("Store %(store_name)s could not be configured correctly. "
- "Reason: %(reason)s")
-
-
-class BadDriverConfiguration(HeatException):
- message = _("Driver %(driver_name)s could not be configured correctly. "
- "Reason: %(reason)s")
-
-
-class StoreDeleteNotSupported(HeatException):
- message = _("Deleting images from this store is not supported.")
-
-
-class StoreAddDisabled(HeatException):
- message = _("Configuration for store failed. Adding images to this "
- "store is disabled.")
-
-
-class InvalidNotifierStrategy(HeatException):
- message = _("'%(strategy)s' is not an available notifier strategy.")
-
-
-class MaxRedirectsExceeded(HeatException):
+class MaxRedirectsExceeded(OpenstackException):
message = _("Maximum redirects (%(redirects)s) was exceeded.")
-class InvalidRedirect(HeatException):
+class InvalidRedirect(OpenstackException):
message = _("Received invalid HTTP redirect.")
-class NoServiceEndpoint(HeatException):
+class NoServiceEndpoint(OpenstackException):
message = _("Response from Keystone does not contain a Heat endpoint.")
-class RegionAmbiguity(HeatException):
+class RegionAmbiguity(OpenstackException):
message = _("Multiple 'image' service matches for region %(region)s. This "
"generally means that a region is required and you have not "
"supplied one.")
from eventlet import semaphore
from eventlet.green import subprocess
-from heat.common import exception
+from heat.openstack.common import exception
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
-def import_class(import_str):
- """Returns a class from a string including module and class."""
- mod_str, _sep, class_str = import_str.rpartition('.')
- try:
- __import__(mod_str)
- return getattr(sys.modules[mod_str], class_str)
- except (ImportError, ValueError, AttributeError), exc:
- #LOG.debug(_('Inner Exception: %s'), exc)
- raise exception.ClassNotFound(class_name=class_str, exception=exc)
-
-
-def import_object(import_str):
- """Returns an object including a module or module and class."""
- try:
- __import__(import_str)
- return sys.modules[import_str]
- except ImportError:
- cls = import_class(import_str)
- return cls()
-
-class LazyPluggable(object):
- """A pluggable backend loaded lazily based on some value."""
-
- def __init__(self, pivot, **backends):
- self.__backends = backends
- self.__pivot = pivot
- self.__backend = None
-
- def __get_backend(self):
- if not self.__backend:
- backend_name = FLAGS[self.__pivot]
- if backend_name not in self.__backends:
- raise exception.Error(_('Invalid backend: %s') % backend_name)
-
- backend = self.__backends[backend_name]
- if isinstance(backend, tuple):
- name = backend[0]
- fromlist = backend[1]
- else:
- name = backend
- fromlist = backend
-
- self.__backend = __import__(name, None, None, fromlist)
- #LOG.debug(_('backend %s'), self.__backend)
- return self.__backend
-
- def __getattr__(self, key):
- backend = self.__get_backend()
- return getattr(backend, key)
-
def chunkreadable(iter, chunk_size=65536):
"""
Wrap a readable iterator with a reader yielding chunks of
break
-def import_class(import_str):
- """Returns a class from a string including module and class"""
- mod_str, _sep, class_str = import_str.rpartition('.')
- try:
- __import__(mod_str)
- return getattr(sys.modules[mod_str], class_str)
- except (ImportError, ValueError, AttributeError), e:
- raise exception.ImportFailure(import_str=import_str,
- reason=e)
-
-
-def import_object(import_str):
- """Returns an object including a module or module and class"""
- try:
- __import__(import_str)
- return sys.modules[import_str]
- except ImportError:
- cls = import_class(import_str)
- return cls()
-
-
def generate_uuid():
return str(uuid.uuid4())
str += ('Z' if tz == 'UTC' else tz)
return str
-
-def parse_isotime(timestr):
- """Turn an iso formatted time back into a datetime."""
- try:
- return iso8601.parse_date(timestr)
- except (iso8601.ParseError, TypeError) as e:
- raise ValueError(e.message)
-
-
-def normalize_time(timestamp):
- """Normalize time in arbitrary timezone to UTC"""
- offset = timestamp.utcoffset()
- return timestamp.replace(tzinfo=None) - offset if offset else timestamp
-
-def utcnow():
- """Overridable version of utils.utcnow."""
- if utcnow.override_time:
- return utcnow.override_time
- return datetime.datetime.utcnow()
-
-utcnow.override_time = None
-
-
class LoopingCallDone(Exception):
"""Exception to break out and stop a LoopingCall.
import webob.exc
from heat.common import exception
-from heat.common import utils
from heat.openstack.common import cfg
+from heat.openstack.common import utils
bind_opts = [
import logging
from heat.openstack.common import local
-from heat.common import utils
+from heat.openstack.common import utils
+from heat.common import utils as heat_utils
LOG = logging.getLogger(__name__)
def generate_request_id():
- return 'req-' + str(utils.gen_uuid())
+ return 'req-' + str(heat_utils.gen_uuid())
class RequestContext(object):
if not timestamp:
timestamp = utils.utcnow()
if isinstance(timestamp, basestring):
- timestamp = utils.parse_strtime(timestamp)
+ timestamp = heat_utils.parse_strtime(timestamp)
self.timestamp = timestamp
if not request_id:
request_id = generate_request_id()
'read_deleted': self.read_deleted,
'roles': self.roles,
'remote_address': self.remote_address,
- 'timestamp': utils.strtime(self.timestamp),
+ 'timestamp': heat_utils.strtime(self.timestamp),
'request_id': self.request_id,
'auth_token': self.auth_token}
>>> db.event_get(context, event_id)
# Event object received
-The underlying driver is loaded as a :class:`LazyPluggable`. SQLAlchemy is
-currently the only supported backend.
+The underlying driver is loaded . SQLAlchemy is currently the only
+supported backend.
'''
-from heat.common import utils
+from heat.openstack.common import utils
def configure(conf):
global IMPL
from novaclient.v1_1 import client
from heat.db import api as db_api
-from heat.common.config import HeatEngineConfigOpts
-import pdb
-db_api.configure(HeatEngineConfigOpts())
logger = logging.getLogger('heat.engine.resources')
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Exceptions common to OpenStack projects
+"""
+
+import logging
+
+
+class ProcessExecutionError(IOError):
+ def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
+ description=None):
+ if description is None:
+ description = "Unexpected error while running command."
+ if exit_code is None:
+ exit_code = '-'
+ message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
+ description, cmd, exit_code, stdout, stderr)
+ IOError.__init__(self, message)
+
+
+class Error(Exception):
+ def __init__(self, message=None):
+ super(Error, self).__init__(message)
+
+
+class ApiError(Error):
+ def __init__(self, message='Unknown', code='Unknown'):
+ self.message = message
+ self.code = code
+ super(ApiError, self).__init__('%s: %s' % (code, message))
+
+
+class NotFound(Error):
+ pass
+
+
+class UnknownScheme(Error):
+
+ msg = "Unknown scheme '%s' found in URI"
+
+ def __init__(self, scheme):
+ msg = self.__class__.msg % scheme
+ super(UnknownScheme, self).__init__(msg)
+
+
+class BadStoreUri(Error):
+
+ msg = "The Store URI %s was malformed. Reason: %s"
+
+ def __init__(self, uri, reason):
+ msg = self.__class__.msg % (uri, reason)
+ super(BadStoreUri, self).__init__(msg)
+
+
+class Duplicate(Error):
+ pass
+
+
+class NotAuthorized(Error):
+ pass
+
+
+class NotEmpty(Error):
+ pass
+
+
+class Invalid(Error):
+ pass
+
+
+class BadInputError(Exception):
+ """Error resulting from a client sending bad input to a server"""
+ pass
+
+
+class MissingArgumentError(Error):
+ pass
+
+
+class DatabaseMigrationError(Error):
+ pass
+
+
+class ClientConnectionError(Exception):
+ """Error resulting from a client connecting to a server"""
+ pass
+
+
+def wrap_exception(f):
+ def _wrap(*args, **kw):
+ try:
+ return f(*args, **kw)
+ except Exception, e:
+ if not isinstance(e, Error):
+ #exc_type, exc_value, exc_traceback = sys.exc_info()
+ logging.exception('Uncaught exception')
+ #logging.error(traceback.extract_stack(exc_traceback))
+ raise Error(str(e))
+ raise
+ _wrap.func_name = f.func_name
+ return _wrap
+
+
+class OpenstackException(Exception):
+ """
+ Base Exception
+
+ To correctly use this class, inherit from it and define
+ a 'message' property. That message will get printf'd
+ with the keyword arguments provided to the constructor.
+ """
+ message = "An unknown exception occurred"
+
+ def __init__(self, **kwargs):
+ try:
+ self._error_string = self.message % kwargs
+
+ except Exception:
+ # at least get the core message out if something happened
+ self._error_string = self.message
+
+ def __str__(self):
+ return self._error_string
+
+
+class MalformedRequestBody(OpenstackException):
+ message = "Malformed message body: %(reason)s"
+
+
+class InvalidContentType(OpenstackException):
+ message = "Invalid content type %(content_type)s"
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+System-level utilities and helper functions.
+"""
+
+import datetime
+import logging
+import os
+import random
+import shlex
+import sys
+
+from eventlet import greenthread
+from eventlet.green import subprocess
+import iso8601
+
+from heat.openstack.common import exception
+
+
+TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
+LOG = logging.getLogger(__name__)
+
+
+def int_from_bool_as_string(subject):
+ """
+ Interpret a string as a boolean and return either 1 or 0.
+
+ Any string value in:
+ ('True', 'true', 'On', 'on', '1')
+ is interpreted as a boolean True.
+
+ Useful for JSON-decoded stuff and config file parsing
+ """
+ return bool_from_string(subject) and 1 or 0
+
+
+def bool_from_string(subject):
+ """
+ Interpret a string as a boolean.
+
+ Any string value in:
+ ('True', 'true', 'On', 'on', 'Yes', 'yes', '1')
+ is interpreted as a boolean True.
+
+ Useful for JSON-decoded stuff and config file parsing
+ """
+ if isinstance(subject, bool):
+ return subject
+ if isinstance(subject, basestring):
+ if subject.strip().lower() in ('true', 'on', 'yes', '1'):
+ return True
+ return False
+
+
+def execute(*cmd, **kwargs):
+ """
+ Helper method to execute command with optional retry.
+
+ :cmd Passed to subprocess.Popen.
+ :process_input Send to opened process.
+ :check_exit_code Defaults to 0. Raise exception.ProcessExecutionError
+ unless program exits with this code.
+ :delay_on_retry True | False. Defaults to True. If set to True, wait a
+ short amount of time before retrying.
+ :attempts How many times to retry cmd.
+ :run_as_root True | False. Defaults to False. If set to True,
+ the command is prefixed by the command specified
+ in the root_helper kwarg.
+ :root_helper command to prefix all cmd's with
+
+ :raises exception.Error on receiving unknown arguments
+ :raises exception.ProcessExecutionError
+ """
+
+ process_input = kwargs.pop('process_input', None)
+ check_exit_code = kwargs.pop('check_exit_code', 0)
+ delay_on_retry = kwargs.pop('delay_on_retry', True)
+ attempts = kwargs.pop('attempts', 1)
+ run_as_root = kwargs.pop('run_as_root', False)
+ root_helper = kwargs.pop('root_helper', '')
+ if len(kwargs):
+ raise exception.Error(_('Got unknown keyword args '
+ 'to utils.execute: %r') % kwargs)
+ if run_as_root:
+ cmd = shlex.split(root_helper) + list(cmd)
+ cmd = map(str, cmd)
+
+ while attempts > 0:
+ attempts -= 1
+ try:
+ LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
+ _PIPE = subprocess.PIPE # pylint: disable=E1101
+ obj = subprocess.Popen(cmd,
+ stdin=_PIPE,
+ stdout=_PIPE,
+ stderr=_PIPE,
+ close_fds=True)
+ result = None
+ if process_input is not None:
+ result = obj.communicate(process_input)
+ else:
+ result = obj.communicate()
+ obj.stdin.close() # pylint: disable=E1101
+ _returncode = obj.returncode # pylint: disable=E1101
+ if _returncode:
+ LOG.debug(_('Result was %s') % _returncode)
+ if (isinstance(check_exit_code, int) and
+ not isinstance(check_exit_code, bool) and
+ _returncode != check_exit_code):
+ (stdout, stderr) = result
+ raise exception.ProcessExecutionError(
+ exit_code=_returncode,
+ stdout=stdout,
+ stderr=stderr,
+ cmd=' '.join(cmd))
+ return result
+ except exception.ProcessExecutionError:
+ if not attempts:
+ raise
+ else:
+ LOG.debug(_('%r failed. Retrying.'), cmd)
+ if delay_on_retry:
+ greenthread.sleep(random.randint(20, 200) / 100.0)
+ finally:
+ # NOTE(termie): this appears to be necessary to let the subprocess
+ # call clean something up in between calls, without
+ # it two execute calls in a row hangs the second one
+ greenthread.sleep(0)
+
+
+def import_class(import_str):
+ """Returns a class from a string including module and class"""
+ mod_str, _sep, class_str = import_str.rpartition('.')
+ try:
+ __import__(mod_str)
+ return getattr(sys.modules[mod_str], class_str)
+ except (ImportError, ValueError, AttributeError):
+ raise exception.NotFound('Class %s cannot be found' % class_str)
+
+
+def import_object(import_str):
+ """Returns an object including a module or module and class"""
+ try:
+ __import__(import_str)
+ return sys.modules[import_str]
+ except ImportError:
+ return import_class(import_str)
+
+
+def isotime(at=None):
+ """Stringify time in ISO 8601 format"""
+ if not at:
+ at = datetime.datetime.utcnow()
+ str = at.strftime(TIME_FORMAT)
+ tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
+ str += ('Z' if tz == 'UTC' else tz)
+ return str
+
+
+def parse_isotime(timestr):
+ """Parse time from ISO 8601 format"""
+ try:
+ return iso8601.parse_date(timestr)
+ except iso8601.ParseError as e:
+ raise ValueError(e.message)
+ except TypeError as e:
+ raise ValueError(e.message)
+
+
+def normalize_time(timestamp):
+ """Normalize time in arbitrary timezone to UTC"""
+ offset = timestamp.utcoffset()
+ return timestamp.replace(tzinfo=None) - offset if offset else timestamp
+
+
+def utcnow():
+ """Overridable version of utils.utcnow."""
+ if utcnow.override_time:
+ return utcnow.override_time
+ return datetime.datetime.utcnow()
+
+
+utcnow.override_time = None
+
+
+def set_time_override(override_time=datetime.datetime.utcnow()):
+ """Override utils.utcnow to return a constant time."""
+ utcnow.override_time = override_time
+
+
+def clear_time_override():
+ """Remove the overridden time."""
+ utcnow.override_time = None
+
+
+def auth_str_equal(provided, known):
+ """Constant-time string comparison.
+
+ :params provided: the first string
+ :params known: the second string
+
+ :return: True if the strings are equal.
+
+ This function takes two strings and compares them. It is intended to be
+ used when doing a comparison for authentication purposes to help guard
+ against timing attacks. When using the function for this purpose, always
+ provide the user-provided password as the first argument. The time this
+ function will take is always a factor of the length of this string.
+ """
+ result = 0
+ p_len = len(provided)
+ k_len = len(known)
+ for i in xrange(p_len):
+ a = ord(provided[i]) if i < p_len else 0
+ b = ord(known[i]) if i < k_len else 0
+ result |= a ^ b
+ return (p_len == k_len) & (result == 0)
# under the License.
from heat.openstack.common import cfg
-from heat.common import utils
-from heat.common import config
-
-
-rpc_backend_opt = cfg.StrOpt('rpc_backend',
- default='heat.rpc.impl_qpid',
- help="The messaging module to use, defaults to kombu.")
-
-FLAGS = config.FLAGS
-FLAGS.register_opt(rpc_backend_opt)
+from heat.openstack.common import utils
def create_connection(new=True):
_RPCIMPL = None
+def configure(conf):
+ """Delay import of rpc_backend until FLAGS are loaded."""
+ print 'configuring rpc %s' % conf.rpc_backend
+ global _RPCIMPL
+ _RPCIMPL = utils.import_object(conf.rpc_backend)
def _get_impl():
"""Delay import of rpc_backend until FLAGS are loaded."""
global _RPCIMPL
if _RPCIMPL is None:
- _RPCIMPL = utils.import_object(FLAGS.rpc_backend)
+ print 'rpc not configured'
+
return _RPCIMPL
+
import heat.rpc.common as rpc_common
LOG = logging.getLogger(__name__)
+FLAGS = config.FLAGS
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, *args, **kwargs):
self.connection_cls = kwargs.pop("connection_cls", None)
- kwargs.setdefault("max_size", config.FLAGS.rpc_conn_pool_size)
+ kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
def __init__(self, proxy, connection_pool):
self.proxy = proxy
- self.pool = greenpool.GreenPool(config.FLAGS.rpc_thread_pool_size)
+ self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
self.connection_pool = connection_pool
def __call__(self, message_data):
def __init__(self, connection, timeout):
self._connection = connection
self._iterator = connection.iterconsume(
- timeout=timeout or config.FLAGS.rpc_response_timeout)
+ timeout=timeout or FLAGS.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
import copy
import logging
-from heat.common import exception
from heat.openstack.common import cfg
+from heat.openstack.common import exception
from heat.common import config
LOG = logging.getLogger(__name__)
-rpc_opts = [
- cfg.IntOpt('rpc_thread_pool_size',
- default=1024,
- help='Size of RPC thread pool'),
- cfg.IntOpt('rpc_conn_pool_size',
- default=30,
- help='Size of RPC connection pool'),
- cfg.IntOpt('rpc_response_timeout',
- default=60,
- help='Seconds to wait for a response from call or multicall'),
- ]
-config.FLAGS.register_opts(rpc_opts)
-
-
-class RemoteError(exception.NovaException):
+class RemoteError(exception.OpenstackException):
"""Signifies that a remote class has raised an exception.
Contains a string representation of the type of the original exception,
traceback=traceback)
-class Timeout(exception.NovaException):
+class Timeout(exception.OpenstackException):
"""Signifies that a timeout has occurred.
This exception is raised if the rpc_response_timeout is reached while
LOG = logging.getLogger(__name__)
-qpid_opts = [
- cfg.StrOpt('qpid_hostname',
- default='localhost',
- help='Qpid broker hostname'),
- cfg.StrOpt('qpid_port',
- default='5672',
- help='Qpid broker port'),
- cfg.StrOpt('qpid_username',
- default='',
- help='Username for qpid connection'),
- cfg.StrOpt('qpid_password',
- default='',
- help='Password for qpid connection'),
- cfg.StrOpt('qpid_sasl_mechanisms',
- default='',
- help='Space separated list of SASL mechanisms to use for auth'),
- cfg.BoolOpt('qpid_reconnect',
- default=True,
- help='Automatically reconnect'),
- cfg.IntOpt('qpid_reconnect_timeout',
- default=0,
- help='Reconnection timeout in seconds'),
- cfg.IntOpt('qpid_reconnect_limit',
- default=0,
- help='Max reconnections before giving up'),
- cfg.IntOpt('qpid_reconnect_interval_min',
- default=0,
- help='Minimum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval_max',
- default=0,
- help='Maximum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval',
- default=0,
- help='Equivalent to setting max and min to the same value'),
- cfg.IntOpt('qpid_heartbeat',
- default=5,
- help='Seconds between connection keepalive heartbeats'),
- cfg.StrOpt('qpid_protocol',
- default='tcp',
- help="Transport to use, either 'tcp' or 'ssl'"),
- cfg.BoolOpt('qpid_tcp_nodelay',
- default=True,
- help='Disable Nagle algorithm'),
- ]
-
-FLAGS = config.FLAGS
-FLAGS.register_opts(qpid_opts)
-
-
class ConsumerBase(object):
"""Consumer base class."""
"""
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (FLAGS.control_exchange, topic), {},
+ "%s/%s" % (config.FLAGS.control_exchange, topic), {},
topic, {})
"""init a 'topic' publisher.
"""
super(TopicPublisher, self).__init__(session,
- "%s/%s" % (FLAGS.control_exchange, topic))
+ "%s/%s" % (config.FLAGS.control_exchange, topic))
class FanoutPublisher(Publisher):
"""init a 'topic' publisher.
"""
super(NotifyPublisher, self).__init__(session,
- "%s/%s" % (FLAGS.control_exchange, topic),
+ "%s/%s" % (config.FLAGS.control_exchange, topic),
{"durable": True})
if server_params is None:
server_params = {}
- default_params = dict(hostname=FLAGS.qpid_hostname,
- port=FLAGS.qpid_port,
- username=FLAGS.qpid_username,
- password=FLAGS.qpid_password)
+ default_params = dict(hostname=config.FLAGS.qpid_hostname,
+ port=config.FLAGS.qpid_port,
+ username=config.FLAGS.qpid_username,
+ password=config.FLAGS.qpid_password)
params = server_params
for key in default_params.keys():
# before we call open
self.connection.username = params['username']
self.connection.password = params['password']
- self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
- self.connection.reconnect = FLAGS.qpid_reconnect
- if FLAGS.qpid_reconnect_timeout:
- self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
- if FLAGS.qpid_reconnect_limit:
- self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit
- if FLAGS.qpid_reconnect_interval_max:
+ self.connection.sasl_mechanisms = config.FLAGS.qpid_sasl_mechanisms
+ self.connection.reconnect = config.FLAGS.qpid_reconnect
+ if config.FLAGS.qpid_reconnect_timeout:
+ self.connection.reconnect_timeout = config.FLAGS.qpid_reconnect_timeout
+ if config.FLAGS.qpid_reconnect_limit:
+ self.connection.reconnect_limit = config.FLAGS.qpid_reconnect_limit
+ if config.FLAGS.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
- FLAGS.qpid_reconnect_interval_max)
- if FLAGS.qpid_reconnect_interval_min:
+ config.FLAGS.qpid_reconnect_interval_max)
+ if config.FLAGS.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
- FLAGS.qpid_reconnect_interval_min)
- if FLAGS.qpid_reconnect_interval:
- self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval
- self.connection.hearbeat = FLAGS.qpid_heartbeat
- self.connection.protocol = FLAGS.qpid_protocol
- self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay
+ config.FLAGS.qpid_reconnect_interval_min)
+ if config.FLAGS.qpid_reconnect_interval:
+ self.connection.reconnect_interval = config.FLAGS.qpid_reconnect_interval
+ self.connection.hearbeat = config.FLAGS.qpid_heartbeat
+ self.connection.protocol = config.FLAGS.qpid_protocol
+ self.connection.tcp_nodelay = config.FLAGS.qpid_tcp_nodelay
# Open is part of reconnect -
# NOTE(WGH) not sure we need this with the reconnect flags
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
LOG.error(_('Unable to connect to AMQP server: %s ') % e)
- time.sleep(FLAGS.qpid_reconnect_interval or 1)
+ time.sleep(config.FLAGS.qpid_reconnect_interval or 1)
else:
break
import greenlet
from heat.openstack.common import cfg
+from heat.openstack.common import utils
-from heat.common import utils
-from heat.common import config
+from heat.common import utils as heat_utils
from heat.common import exception
from heat import context
LOG = logging.getLogger(__name__)
-service_opts = [
- cfg.IntOpt('report_interval',
- default=10,
- help='seconds between nodes reporting state to datastore'),
- cfg.IntOpt('periodic_interval',
- default=60,
- help='seconds between running periodic tasks'),
- cfg.StrOpt('ec2_listen',
- default="0.0.0.0",
- help='IP address for EC2 API to listen'),
- cfg.IntOpt('ec2_listen_port',
- default=8773,
- help='port for ec2 api to listen'),
- cfg.StrOpt('osapi_compute_listen',
- default="0.0.0.0",
- help='IP address for OpenStack API to listen'),
- cfg.IntOpt('osapi_compute_listen_port',
- default=8774,
- help='list port for osapi compute'),
- cfg.StrOpt('metadata_manager',
- default='nova.api.manager.MetadataManager',
- help='OpenStack metadata service manager'),
- cfg.StrOpt('metadata_listen',
- default="0.0.0.0",
- help='IP address for metadata api to listen'),
- cfg.IntOpt('metadata_listen_port',
- default=8775,
- help='port for metadata api to listen'),
- cfg.StrOpt('osapi_volume_listen',
- default="0.0.0.0",
- help='IP address for OpenStack Volume API to listen'),
- cfg.IntOpt('osapi_volume_listen_port',
- default=8776,
- help='port for os volume api to listen'),
- ]
-
-FLAGS = config.FLAGS
-FLAGS.register_opts(service_opts)
-
-
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
self.conn.consume_in_thread()
if self.periodic_interval:
- periodic = utils.LoopingCall(self.periodic_tasks)
+ periodic = heat_utils.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval, now=False)
self.timers.append(periodic)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
- periodic_interval=None):
+ periodic_interval=None, config=None):
"""Instantiates class and passes back application object.
:param host: defaults to FLAGS.host
:param periodic_interval: defaults to FLAGS.periodic_interval
"""
+ global FLAGS
+ FLAGS = config
if not host:
host = FLAGS.host
if not binary:
[DEFAULT]
# The list of modules to copy from openstack-common
-modules=cfg,local,iniparser
+modules=cfg,local,iniparser,utils,exception
# The base module to hold the copy of openstack.common
base=heat