From: Gary Kotton Date: Thu, 8 Nov 2012 21:16:01 +0000 (+0000) Subject: Update latest openstack-common code X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=8eb832715117b9f79d6d20d2dd17b6ff7efe4473;p=openstack-build%2Fneutron-build.git Update latest openstack-common code This fixes bug 1073999 (quantum/openstack/common/rpc/impl_qpid.py) In addition to this the common code is updated. Change-Id: I41223963baf34772edcd0d6d7ef5686a5fad1035 --- diff --git a/quantum/openstack/common/cfg.py b/quantum/openstack/common/cfg.py index 4fcd24251..c760ede2a 100644 --- a/quantum/openstack/common/cfg.py +++ b/quantum/openstack/common/cfg.py @@ -236,7 +236,7 @@ log files: This module also contains a global instance of the CommonConfigOpts class in order to support a common usage pattern in OpenStack: - from openstack.common import cfg + from quantum.openstack.common import cfg opts = [ cfg.StrOpt('bind_host', default='0.0.0.0'), diff --git a/quantum/openstack/common/exception.py b/quantum/openstack/common/exception.py index ba32da550..4866de2fd 100644 --- a/quantum/openstack/common/exception.py +++ b/quantum/openstack/common/exception.py @@ -22,18 +22,6 @@ Exceptions common to OpenStack projects import logging -class ProcessExecutionError(IOError): - def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, - description=None): - if description is None: - description = "Unexpected error while running command." - if exit_code is None: - exit_code = '-' - message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % ( - description, cmd, exit_code, stdout, stderr) - IOError.__init__(self, message) - - class Error(Exception): def __init__(self, message=None): super(Error, self).__init__(message) diff --git a/quantum/openstack/common/gettextutils.py b/quantum/openstack/common/gettextutils.py index 235350cc4..81076c1f3 100644 --- a/quantum/openstack/common/gettextutils.py +++ b/quantum/openstack/common/gettextutils.py @@ -20,7 +20,7 @@ gettext for openstack-common modules. Usual usage in an openstack.common module: - from openstack.common.gettextutils import _ + from quantum.openstack.common.gettextutils import _ """ import gettext diff --git a/quantum/openstack/common/log.py b/quantum/openstack/common/log.py index acdf96def..9a9dfc577 100644 --- a/quantum/openstack/common/log.py +++ b/quantum/openstack/common/log.py @@ -257,7 +257,7 @@ class JSONFormatter(logging.Formatter): class PublishErrorsHandler(logging.Handler): def emit(self, record): - if ('openstack.common.notifier.log_notifier' in + if ('quantum.openstack.common.notifier.log_notifier' in CONF.notification_driver): return notifier.api.notify(None, 'error.publisher', diff --git a/quantum/openstack/common/rpc/__init__.py b/quantum/openstack/common/rpc/__init__.py index 89940ef1a..f26919306 100644 --- a/quantum/openstack/common/rpc/__init__.py +++ b/quantum/openstack/common/rpc/__init__.py @@ -250,7 +250,7 @@ def queue_get_for(context, topic, host): Messages sent to the 'foo.' topic are sent to the nova-foo service on . """ - return '%s.%s' % (topic, host) + return '%s.%s' % (topic, host) if host else topic _RPCIMPL = None diff --git a/quantum/openstack/common/rpc/impl_kombu.py b/quantum/openstack/common/rpc/impl_kombu.py index facf89886..b0b292794 100644 --- a/quantum/openstack/common/rpc/impl_kombu.py +++ b/quantum/openstack/common/rpc/impl_kombu.py @@ -267,6 +267,7 @@ class FanoutConsumer(ConsumerBase): # Default options options = {'durable': False, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, 'exclusive': True} options.update(kwargs) @@ -408,18 +409,18 @@ class Connection(object): hostname, port = network_utils.parse_host_port( adr, default_port=self.conf.rabbit_port) - params = {} + params = { + 'hostname': hostname, + 'port': port, + 'userid': self.conf.rabbit_userid, + 'password': self.conf.rabbit_password, + 'virtual_host': self.conf.rabbit_virtual_host, + } for sp_key, value in server_params.iteritems(): p_key = server_params_to_kombu_params.get(sp_key, sp_key) params[p_key] = value - params.setdefault('hostname', hostname) - params.setdefault('port', port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) - if self.conf.fake_rabbit: params['transport'] = 'memory' if self.conf.rabbit_use_ssl: @@ -776,7 +777,7 @@ def cast_to_server(conf, context, server_params, topic, msg): def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.cast_to_server( + return rpc_amqp.fanout_cast_to_server( conf, context, server_params, topic, msg, rpc_amqp.get_connection_pool(conf, Connection)) diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index efdf21211..d8813bbfd 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -50,24 +50,6 @@ qpid_opts = [ cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), - cfg.BoolOpt('qpid_reconnect', - default=True, - help='Automatically reconnect'), - cfg.IntOpt('qpid_reconnect_timeout', - default=0, - help='Reconnection timeout in seconds'), - cfg.IntOpt('qpid_reconnect_limit', - default=0, - help='Max reconnections before giving up'), - cfg.IntOpt('qpid_reconnect_interval_min', - default=0, - help='Minimum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval_max', - default=0, - help='Maximum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval', - default=0, - help='Equivalent to setting max and min to the same value'), cfg.IntOpt('qpid_heartbeat', default=60, help='Seconds between connection keepalive heartbeats'), @@ -294,50 +276,36 @@ class Connection(object): self.consumer_thread = None self.conf = conf - if server_params is None: - server_params = {} - - default_params = dict(hostname=self.conf.qpid_hostname, - port=self.conf.qpid_port, - username=self.conf.qpid_username, - password=self.conf.qpid_password) - - params = server_params - for key in default_params.keys(): - params.setdefault(key, default_params[key]) + params = { + 'hostname': self.conf.qpid_hostname, + 'port': self.conf.qpid_port, + 'username': self.conf.qpid_username, + 'password': self.conf.qpid_password, + } + params.update(server_params or {}) self.broker = params['hostname'] + ":" + str(params['port']) + self.username = params['username'] + self.password = params['password'] + self.connection_create() + self.reconnect() + + def connection_create(self): # Create the connection - this does not open the connection self.connection = qpid.messaging.Connection(self.broker) # Check if flags are set and if so set them for the connection # before we call open - self.connection.username = params['username'] - self.connection.password = params['password'] + self.connection.username = self.username + self.connection.password = self.password + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms - self.connection.reconnect = self.conf.qpid_reconnect - if self.conf.qpid_reconnect_timeout: - self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) - if self.conf.qpid_reconnect_limit: - self.connection.reconnect_limit = self.conf.qpid_reconnect_limit - if self.conf.qpid_reconnect_interval_max: - self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) - if self.conf.qpid_reconnect_interval_min: - self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) - if self.conf.qpid_reconnect_interval: - self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + # Reconnection is done by self.reconnect() + self.connection.reconnect = False self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay - # Open is part of reconnect - - # NOTE(WGH) not sure we need this with the reconnect flags - self.reconnect() - def _register_consumer(self, consumer): self.consumers[str(consumer.get_receiver())] = consumer @@ -352,12 +320,18 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + delay = 1 while True: try: + self.connection_create() self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: - LOG.error(_('Unable to connect to AMQP server: %s'), e) - time.sleep(self.conf.qpid_reconnect_interval or 1) + msg_dict = dict(e=e, delay=delay) + msg = _("Unable to connect to AMQP server: %(e)s. " + "Sleeping %(delay)s seconds") % msg_dict + LOG.error(msg) + time.sleep(delay) + delay = min(2 * delay, 60) else: break @@ -365,10 +339,14 @@ class Connection(object): self.session = self.connection.session() - for consumer in self.consumers.itervalues(): - consumer.reconnect(self.session) - if self.consumers: + consumers = self.consumers + self.consumers = {} + + for consumer in consumers.itervalues(): + consumer.reconnect(self.session) + self._register_consumer(consumer) + LOG.debug(_("Re-established AMQP queues")) def ensure(self, error_callback, method, *args, **kwargs): diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index f4fcce038..02a44d21b 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -546,7 +546,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): timeout = timeout or CONF.rpc_response_timeout # The msg_id is used to track replies. - msg_id = str(uuid.uuid4().hex) + msg_id = uuid.uuid4().hex # Replies always come into the reply service. reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host diff --git a/quantum/openstack/common/threadgroup.py b/quantum/openstack/common/threadgroup.py index 9d39d0498..0aea30ff6 100644 --- a/quantum/openstack/common/threadgroup.py +++ b/quantum/openstack/common/threadgroup.py @@ -47,7 +47,7 @@ class Thread(object): self.thread.link(_thread_done, group=group, thread=self) def stop(self): - self.thread.cancel() + self.thread.kill() def wait(self): return self.thread.wait() diff --git a/quantum/openstack/common/uuidutils.py b/quantum/openstack/common/uuidutils.py index 51042a798..7608acb94 100644 --- a/quantum/openstack/common/uuidutils.py +++ b/quantum/openstack/common/uuidutils.py @@ -22,6 +22,10 @@ UUID related utilities and helper functions. import uuid +def generate_uuid(): + return str(uuid.uuid4()) + + def is_uuid_like(val): """Returns validation of a value as a UUID.