]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Latest OSLO updates
authorGary Kotton <gkotton@redhat.com>
Mon, 7 Jan 2013 13:52:21 +0000 (13:52 +0000)
committerGary Kotton <gkotton@redhat.com>
Sat, 12 Jan 2013 06:16:17 +0000 (06:16 +0000)
Change-Id: Ibf223203c8b34f614357fa4539d0dfa953765d6b

29 files changed:
quantum/common/config.py
quantum/openstack/common/eventlet_backdoor.py
quantum/openstack/common/exception.py
quantum/openstack/common/excutils.py
quantum/openstack/common/importutils.py
quantum/openstack/common/jsonutils.py
quantum/openstack/common/lockutils.py
quantum/openstack/common/log.py
quantum/openstack/common/loopingcall.py
quantum/openstack/common/notifier/api.py
quantum/openstack/common/notifier/rpc_notifier.py
quantum/openstack/common/notifier/rpc_notifier2.py [new file with mode: 0644]
quantum/openstack/common/periodic_task.py
quantum/openstack/common/rpc/__init__.py
quantum/openstack/common/rpc/amqp.py
quantum/openstack/common/rpc/common.py
quantum/openstack/common/rpc/dispatcher.py
quantum/openstack/common/rpc/impl_fake.py
quantum/openstack/common/rpc/impl_kombu.py
quantum/openstack/common/rpc/impl_qpid.py
quantum/openstack/common/rpc/impl_zmq.py
quantum/openstack/common/rpc/matchmaker.py
quantum/openstack/common/rpc/service.py
quantum/openstack/common/service.py
quantum/openstack/common/setup.py
quantum/openstack/common/threadgroup.py
quantum/openstack/common/timeutils.py
quantum/openstack/common/version.py
tools/pip-requires

index 079714de6120fff71d7f9bae26a94068700b6806..02931510823499f3babca5cc7816e27f59634b7e 100644 (file)
@@ -27,6 +27,7 @@ from quantum.api.v2 import attributes
 from quantum.common import utils
 from quantum.openstack.common import cfg
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
 from quantum.version import version_info as quantum_version
 
 
@@ -49,9 +50,6 @@ core_opts = [
     cfg.IntOpt('max_subnet_host_routes', default=20),
     cfg.IntOpt('dhcp_lease_duration', default=120),
     cfg.BoolOpt('allow_overlapping_ips', default=False),
-    cfg.StrOpt('control_exchange',
-               default='quantum',
-               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
     cfg.StrOpt('host', default=utils.get_hostname()),
     cfg.BoolOpt('force_gateway_on_subnet', default=False,
                 help=_("Ensure that configured gateway is on subnet")),
@@ -67,6 +65,7 @@ cfg.CONF.register_cli_opts(core_cli_opts)
 
 
 def parse(args):
+    rpc.set_defaults(control_exchange='quantum')
     cfg.CONF(args=args, project='quantum',
              version='%%prog %s' % quantum_version.version_string_with_vcs())
 
index cee9bb55d541cf6d164d8704d99104b8d24b7c91..6af0f9da496f9ed2f9612af59613b1776aa440b3 100644 (file)
@@ -46,7 +46,7 @@ def _find_objects(t):
 
 
 def _print_greenthreads():
-    for i, gt in enumerate(find_objects(greenlet.greenlet)):
+    for i, gt in enumerate(_find_objects(greenlet.greenlet)):
         print i, gt
         traceback.print_stack(gt.gr_frame)
         print
@@ -61,7 +61,7 @@ def initialize_if_enabled():
     }
 
     if CONF.backdoor_port is None:
-        return
+        return None
 
     # NOTE(johannes): The standard sys.displayhook will print the value of
     # the last expression and set it to __builtin__._, which overwrites
@@ -73,6 +73,8 @@ def initialize_if_enabled():
             pprint.pprint(val)
     sys.displayhook = displayhook
 
-    eventlet.spawn_n(eventlet.backdoor.backdoor_server,
-                     eventlet.listen(('localhost', CONF.backdoor_port)),
+    sock = eventlet.listen(('localhost', CONF.backdoor_port))
+    port = sock.getsockname()[1]
+    eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
                      locals=backdoor_locals)
+    return port
index 4866de2fd2beaeee9b8227d7cd3e2227a14eacd6..20634b31bbc20fe00e8f5829819c06aae93ed5ae 100644 (file)
@@ -21,6 +21,8 @@ Exceptions common to OpenStack projects
 
 import logging
 
+from quantum.openstack.common.gettextutils import _
+
 
 class Error(Exception):
     def __init__(self, message=None):
@@ -97,7 +99,7 @@ def wrap_exception(f):
         except Exception, e:
             if not isinstance(e, Error):
                 #exc_type, exc_value, exc_traceback = sys.exc_info()
-                logging.exception('Uncaught exception')
+                logging.exception(_('Uncaught exception'))
                 #logging.error(traceback.extract_stack(exc_traceback))
                 raise Error(str(e))
             raise
index 5dd48301760e4778904bdc32f2d748e0e9ef7980..6f6a81a0b9c2ee9ebd83ba20fcd858f849235515 100644 (file)
@@ -24,6 +24,8 @@ import logging
 import sys
 import traceback
 
+from quantum.openstack.common.gettextutils import _
+
 
 @contextlib.contextmanager
 def save_and_reraise_exception():
@@ -43,7 +45,7 @@ def save_and_reraise_exception():
     try:
         yield
     except Exception:
-        logging.error('Original exception being dropped: %s' %
-                      (traceback.format_exception(type_, value, tb)))
+        logging.error(_('Original exception being dropped: %s'),
+                      traceback.format_exception(type_, value, tb))
         raise
     raise type_, value, tb
index f45372b4dba607251b5a79795be42d94b94da8f9..2a28b455e85c811abc3a509e0d3f9775a815cff4 100644 (file)
@@ -29,7 +29,7 @@ def import_class(import_str):
     try:
         __import__(mod_str)
         return getattr(sys.modules[mod_str], class_str)
-    except (ValueError, AttributeError), exc:
+    except (ValueError, AttributeError):
         raise ImportError('Class %s cannot be found (%s)' %
                           (class_str,
                            traceback.format_exception(*sys.exc_info())))
index 9324a61fa343e466feb1e4dc509fed7bc3e0359b..72f39fa1b998763cf6fd384dbf3a7ec7bedf0db7 100644 (file)
@@ -120,7 +120,7 @@ def to_primitive(value, convert_instances=False, level=0):
                                 level=level + 1)
         else:
             return value
-    except TypeError, e:
+    except TypeError:
         # Class objects are tricky since they may define something like
         # __iter__ defined but it isn't callable as list().
         return unicode(value)
index 9f4eddf57c95bd92e8539b8e2c97039dc97bb433..ead7e13c7ffae4909833d388273a24c7abc33dc2 100644 (file)
@@ -28,6 +28,7 @@ from eventlet import semaphore
 
 from quantum.openstack.common import cfg
 from quantum.openstack.common import fileutils
+from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import log as logging
 
 
index 9a9dfc577fa085753ebce06127be0c2fdb1a08f9..0640c307111bcfc28db56332ad50b5069302458e 100644 (file)
@@ -49,19 +49,20 @@ from quantum.openstack.common import notifier
 
 log_opts = [
     cfg.StrOpt('logging_context_format_string',
-               default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
-                       '%(user_id)s %(project_id)s] %(instance)s'
+               default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
+                       '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
                        '%(message)s',
                help='format string to use for log messages with context'),
     cfg.StrOpt('logging_default_format_string',
-               default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
-                       ' %(instance)s%(message)s',
+               default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
+                       '%(name)s [-] %(instance)s%(message)s',
                help='format string to use for log messages without context'),
     cfg.StrOpt('logging_debug_format_suffix',
                default='%(funcName)s %(pathname)s:%(lineno)d',
                help='data to append to log format when level is DEBUG'),
     cfg.StrOpt('logging_exception_prefix',
-               default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
+               default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
+               '%(instance)s',
                help='prefix each line of exception output with this format'),
     cfg.ListOpt('default_log_levels',
                 default=[
@@ -174,7 +175,7 @@ class ContextAdapter(logging.LoggerAdapter):
         self.log(logging.AUDIT, msg, *args, **kwargs)
 
     def deprecated(self, msg, *args, **kwargs):
-        stdmsg = _("Deprecated Config: %s") % msg
+        stdmsg = _("Deprecated: %s") % msg
         if CONF.fatal_deprecations:
             self.critical(stdmsg, *args, **kwargs)
             raise DeprecatedConfig(msg=stdmsg)
@@ -289,6 +290,12 @@ def setup(product_name):
         _setup_logging_from_conf(product_name)
 
 
+def set_defaults(logging_context_format_string):
+    cfg.set_defaults(log_opts,
+                     logging_context_format_string=
+                     logging_context_format_string)
+
+
 def _find_facility_from_conf():
     facility_names = logging.handlers.SysLogHandler.facility_names
     facility = getattr(logging.handlers.SysLogHandler,
index a07216768d35a4f7d6c2a8442ca960464ed68825..efce59528e36e83a0482d673c93870f9344c1cc1 100644 (file)
@@ -24,6 +24,7 @@ from eventlet import greenthread
 
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import timeutils
 
 LOG = logging.getLogger(__name__)
 
@@ -62,10 +63,16 @@ class LoopingCall(object):
 
             try:
                 while self._running:
+                    start = timeutils.utcnow()
                     self.f(*self.args, **self.kw)
+                    end = timeutils.utcnow()
                     if not self._running:
                         break
-                    greenthread.sleep(interval)
+                    delay = interval - timeutils.delta_seconds(start, end)
+                    if delay <= 0:
+                        LOG.warn(_('task run outlasted interval by %s sec') %
+                                 -delay)
+                    greenthread.sleep(delay if delay > 0 else 0)
             except LoopingCallDone, e:
                 self.stop()
                 done.send(e.retvalue)
index d3d4d0f6d092c5b3c5f6a88a266ee5cf926a74b1..09ab447a36cf7c129ad1a21c16e73d34d2221911 100644 (file)
@@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload):
     for driver in _get_drivers():
         try:
             driver.notify(context, msg)
-        except Exception, e:
+        except Exception as e:
             LOG.exception(_("Problem '%(e)s' attempting to "
                             "send to notification system. "
-                            "Payload=%(payload)s") % locals())
+                            "Payload=%(payload)s")
+                          % dict(e=e, payload=payload))
 
 
 _drivers = None
@@ -166,7 +167,7 @@ def add_driver(notification_driver):
         try:
             driver = importutils.import_module(notification_driver)
             _drivers[notification_driver] = driver
-        except ImportError as e:
+        except ImportError:
             LOG.exception(_("Failed to load notifier %s. "
                             "These notifications will not be sent.") %
                           notification_driver)
index c1650537da2fb17a64a0d63bc2e4139e71b956f1..213779777b909d7c21c14ff663a0e865cc6cad1e 100644 (file)
@@ -41,6 +41,6 @@ def notify(context, message):
         topic = '%s.%s' % (topic, priority)
         try:
             rpc.notify(context, topic, message)
-        except Exception, e:
+        except Exception:
             LOG.exception(_("Could not send notification to %(topic)s. "
                             "Payload=%(message)s"), locals())
diff --git a/quantum/openstack/common/notifier/rpc_notifier2.py b/quantum/openstack/common/notifier/rpc_notifier2.py
new file mode 100644 (file)
index 0000000..b0b9e66
--- /dev/null
@@ -0,0 +1,51 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+'''messaging based notification driver, with message envelopes'''
+
+from quantum.openstack.common import cfg
+from quantum.openstack.common import context as req_context
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+
+LOG = logging.getLogger(__name__)
+
+notification_topic_opt = cfg.ListOpt(
+    'topics', default=['notifications', ],
+    help='AMQP topic(s) used for openstack notifications')
+
+opt_group = cfg.OptGroup(name='rpc_notifier2',
+                         title='Options for rpc_notifier2')
+
+CONF = cfg.CONF
+CONF.register_group(opt_group)
+CONF.register_opt(notification_topic_opt, opt_group)
+
+
+def notify(context, message):
+    """Sends a notification via RPC"""
+    if not context:
+        context = req_context.get_admin_context()
+    priority = message.get('priority',
+                           CONF.default_notification_level)
+    priority = priority.lower()
+    for topic in CONF.rpc_notifier2.topics:
+        topic = '%s.%s' % (topic, priority)
+        try:
+            rpc.notify(context, topic, message, envelope=True)
+        except Exception:
+            LOG.exception(_("Could not send notification to %(topic)s. "
+                            "Payload=%(message)s"), locals())
index ba2f5119a9c76f0ed0f897f86d38f4ad67a2f2fb..43d125b50d3eda1aae8624f15e55621039b3b128 100644 (file)
@@ -95,17 +95,21 @@ class PeriodicTasks(object):
             ticks_to_skip = self._ticks_to_skip[task_name]
             if ticks_to_skip > 0:
                 LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
-                            " ticks left until next run"), locals())
+                            " ticks left until next run"),
+                          dict(full_task_name=full_task_name,
+                               ticks_to_skip=ticks_to_skip))
                 self._ticks_to_skip[task_name] -= 1
                 continue
 
             self._ticks_to_skip[task_name] = task._ticks_between_runs
-            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+            LOG.debug(_("Running periodic task %(full_task_name)s"),
+                      dict(full_task_name=full_task_name))
 
             try:
                 task(self, context)
             except Exception as e:
                 if raise_on_error:
                     raise
-                LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
-                              locals())
+                LOG.exception(_("Error during %(full_task_name)s:"
+                                " %(e)s"),
+                              dict(e=e, full_task_name=full_task_name))
index f26919306d5b9f7c969d1dff014c914bd6bb7050..8ce6456d5b58d0972536b4b8dda2f44f5e196616 100644 (file)
@@ -50,25 +50,26 @@ rpc_opts = [
                 default=['quantum.openstack.common.exception',
                          'nova.exception',
                          'cinder.exception',
+                         'exceptions',
                          ],
                 help='Modules of exceptions that are permitted to be recreated'
                      'upon receiving exception data from an rpc call.'),
     cfg.BoolOpt('fake_rabbit',
                 default=False,
                 help='If passed, use a fake RabbitMQ provider'),
-    #
-    # The following options are not registered here, but are expected to be
-    # present. The project using this library must register these options with
-    # the configuration so that project-specific defaults may be defined.
-    #
-    #cfg.StrOpt('control_exchange',
-    #           default='nova',
-    #           help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+    cfg.StrOpt('control_exchange',
+               default='openstack',
+               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
 ]
 
 cfg.CONF.register_opts(rpc_opts)
 
 
+def set_defaults(control_exchange):
+    cfg.set_defaults(rpc_opts,
+                     control_exchange=control_exchange)
+
+
 def create_connection(new=True):
     """Create a connection to the message bus used for rpc.
 
@@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
     return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
 
 
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
     """Send notification event.
 
     :param context: Information that identifies the user that has made this
                     request.
     :param topic: The topic to send the notification to.
     :param msg: This is a dict of content of event.
+    :param envelope: Set to True to enable message envelope for notifications.
 
     :returns: None
     """
-    return _get_impl().notify(cfg.CONF, context, topic, msg)
+    return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
 
 
 def cleanup():
index e763369622b03faa5bd758004971c0e1351e30e3..42fce2dd2cdc2a1fd1266877e8eed94700f8df94 100644 (file)
@@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code.
 """
 
 import inspect
-import logging
 import sys
 import uuid
 
@@ -34,10 +33,10 @@ from eventlet import greenpool
 from eventlet import pools
 from eventlet import semaphore
 
-from quantum.openstack.common import cfg
 from quantum.openstack.common import excutils
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import local
+from quantum.openstack.common import log as logging
 from quantum.openstack.common.rpc import common as rpc_common
 
 
@@ -55,7 +54,7 @@ class Pool(pools.Pool):
 
     # TODO(comstud): Timeout connections not used in a while
     def create(self):
-        LOG.debug('Pool creating new connection')
+        LOG.debug(_('Pool creating new connection'))
         return self.connection_cls(self.conf)
 
     def empty(self):
@@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection):
 
 
 def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
-              ending=False):
+              ending=False, log_failure=True):
     """Sends a reply or an error on the channel signified by msg_id.
 
     Failure should be a sys.exc_info() tuple.
@@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
     """
     with ConnectionContext(conf, connection_pool) as conn:
         if failure:
-            failure = rpc_common.serialize_remote_exception(failure)
+            failure = rpc_common.serialize_remote_exception(failure,
+                                                            log_failure)
 
         try:
             msg = {'result': reply, 'failure': failure}
@@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
                    'failure': failure}
         if ending:
             msg['ending'] = True
-        conn.direct_send(msg_id, msg)
+        conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
 
 
 class RpcContext(rpc_common.CommonRpcContext):
@@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext):
         return self.__class__(**values)
 
     def reply(self, reply=None, failure=None, ending=False,
-              connection_pool=None):
+              connection_pool=None, log_failure=True):
         if self.msg_id:
             msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
-                      ending)
+                      ending, log_failure)
             if ending:
                 self.msg_id = None
 
@@ -282,11 +282,21 @@ class ProxyCallback(object):
                 ctxt.reply(rval, None, connection_pool=self.connection_pool)
             # This final None tells multicall that it is done.
             ctxt.reply(ending=True, connection_pool=self.connection_pool)
-        except Exception as e:
-            LOG.exception('Exception during message handling')
+        except rpc_common.ClientException as e:
+            LOG.debug(_('Expected exception during message handling (%s)') %
+                      e._exc_info[1])
+            ctxt.reply(None, e._exc_info,
+                       connection_pool=self.connection_pool,
+                       log_failure=False)
+        except Exception:
+            LOG.exception(_('Exception during message handling'))
             ctxt.reply(None, sys.exc_info(),
                        connection_pool=self.connection_pool)
 
+    def wait(self):
+        """Wait for all callback threads to exit."""
+        self.pool.waitall()
+
 
 class MulticallWaiter(object):
     def __init__(self, conf, connection, timeout):
@@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
     # that will continue to use the connection.  When it's done,
     # connection.close() will get called which will put it back into
     # the pool
-    LOG.debug(_('Making asynchronous call on %s ...'), topic)
+    LOG.debug(_('Making synchronous call on %s ...'), topic)
     msg_id = uuid.uuid4().hex
     msg.update({'_msg_id': msg_id})
     LOG.debug(_('MSG_ID is %s') % (msg_id))
@@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
     conn = ConnectionContext(conf, connection_pool)
     wait_msg = MulticallWaiter(conf, conn, timeout)
     conn.declare_direct_consumer(msg_id, wait_msg)
-    conn.topic_send(topic, msg)
+    conn.topic_send(topic, rpc_common.serialize_msg(msg))
     return wait_msg
 
 
@@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
     LOG.debug(_('Making asynchronous cast on %s...'), topic)
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
-        conn.topic_send(topic, msg)
+        conn.topic_send(topic, rpc_common.serialize_msg(msg))
 
 
 def fanout_cast(conf, context, topic, msg, connection_pool):
@@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
     LOG.debug(_('Making asynchronous fanout cast...'))
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
-        conn.fanout_send(topic, msg)
+        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
 
 
 def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool, pooled=False,
                            server_params=server_params) as conn:
-        conn.topic_send(topic, msg)
+        conn.topic_send(topic, rpc_common.serialize_msg(msg))
 
 
 def fanout_cast_to_server(conf, context, server_params, topic, msg,
@@ -402,15 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool, pooled=False,
                            server_params=server_params) as conn:
-        conn.fanout_send(topic, msg)
+        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
 
 
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
     """Sends a notification event on a topic."""
-    event_type = msg.get('event_type')
-    LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
+    LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+              dict(event_type=msg.get('event_type'),
+                   topic=topic))
     pack_context(msg, context)
     with ConnectionContext(conf, connection_pool) as conn:
+        if envelope:
+            msg = rpc_common.serialize_msg(msg, force_envelope=True)
         conn.notify_send(topic, msg)
 
 
@@ -420,7 +433,4 @@ def cleanup(connection_pool):
 
 
 def get_control_exchange(conf):
-    try:
-        return conf.control_exchange
-    except cfg.NoSuchOptError:
-        return 'openstack'
+    return conf.control_exchange
index cade2181cfcde69956df7a12c4ea13b722257022..ca248eca9de455db643f61bca6bdd98cfca7be07 100644 (file)
 #    under the License.
 
 import copy
-import logging
+import sys
 import traceback
 
+from quantum.openstack.common import cfg
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import importutils
 from quantum.openstack.common import jsonutils
 from quantum.openstack.common import local
+from quantum.openstack.common import log as logging
 
 
+CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
 
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently.  For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc.  This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer.  See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple.  It is:
+
+    {
+        'quantum.version': <RPC Envelope Version as a String>,
+        'quantum.message': <Application Message Payload, JSON encoded>
+    }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version.  It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload.  The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'quantum.version'
+_MESSAGE_KEY = 'quantum.message'
+
+
+# TODO(russellb) Turn this on after Grizzly.
+_SEND_RPC_ENVELOPE = False
+
+
 class RPCException(Exception):
     message = _("An unknown RPC related exception occurred.")
 
@@ -40,7 +83,7 @@ class RPCException(Exception):
             try:
                 message = self.message % kwargs
 
-            except Exception as e:
+            except Exception:
                 # kwargs doesn't match a variable in the message
                 # log the issue and the kwargs
                 LOG.exception(_('Exception in string format operation'))
@@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
                 "this endpoint.")
 
 
+class UnsupportedRpcEnvelopeVersion(RPCException):
+    message = _("Specified RPC envelope version, %(version)s, "
+                "not supported by this endpoint.")
+
+
 class Connection(object):
     """A connection, returned by rpc.create_connection().
 
@@ -164,8 +212,12 @@ class Connection(object):
 
 def _safe_log(log_func, msg, msg_data):
     """Sanitizes the msg_data field before logging."""
-    SANITIZE = {'set_admin_password': ('new_pass',),
-                'run_instance': ('admin_password',), }
+    SANITIZE = {'set_admin_password': [('args', 'new_pass')],
+                'run_instance': [('args', 'admin_password')],
+                'route_message': [('args', 'message', 'args', 'method_info',
+                                   'method_kwargs', 'password'),
+                                  ('args', 'message', 'args', 'method_info',
+                                   'method_kwargs', 'admin_password')]}
 
     has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
     has_context_token = '_context_auth_token' in msg_data
@@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
     msg_data = copy.deepcopy(msg_data)
 
     if has_method:
-        method = msg_data['method']
-        if method in SANITIZE:
-            args_to_sanitize = SANITIZE[method]
-            for arg in args_to_sanitize:
-                try:
-                    msg_data['args'][arg] = "<SANITIZED>"
-                except KeyError:
-                    pass
+        for arg in SANITIZE.get(msg_data['method'], []):
+            try:
+                d = msg_data
+                for elem in arg[:-1]:
+                    d = d[elem]
+                d[arg[-1]] = '<SANITIZED>'
+            except KeyError, e:
+                LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
+                         {'item': arg,
+                          'err': e})
 
     if has_context_token:
         msg_data['_context_auth_token'] = '<SANITIZED>'
@@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data):
     return log_func(msg, msg_data)
 
 
-def serialize_remote_exception(failure_info):
+def serialize_remote_exception(failure_info, log_failure=True):
     """Prepares exception data to be sent over rpc.
 
     Failure_info should be a sys.exc_info() tuple.
@@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info):
     """
     tb = traceback.format_exception(*failure_info)
     failure = failure_info[1]
-    LOG.error(_("Returning exception %s to caller"), unicode(failure))
-    LOG.error(tb)
+    if log_failure:
+        LOG.error(_("Returning exception %s to caller"), unicode(failure))
+        LOG.error(tb)
 
     kwargs = {}
     if hasattr(failure, 'kwargs'):
@@ -258,7 +313,7 @@ def deserialize_remote_exception(conf, data):
         # we cannot necessarily change an exception message so we must override
         # the __str__ method.
         failure.__class__ = new_ex_type
-    except TypeError as e:
+    except TypeError:
         # NOTE(ameade): If a core exception then just add the traceback to the
         # first exception argument.
         failure.args = (message,) + failure.args[1:]
@@ -309,3 +364,107 @@ class CommonRpcContext(object):
             context.values['read_deleted'] = read_deleted
 
         return context
+
+
+class ClientException(Exception):
+    """This encapsulates some actual exception that is expected to be
+    hit by an RPC proxy object. Merely instantiating it records the
+    current exception information, which will be passed back to the
+    RPC client without exceptional logging."""
+    def __init__(self):
+        self._exc_info = sys.exc_info()
+
+
+def catch_client_exception(exceptions, func, *args, **kwargs):
+    try:
+        return func(*args, **kwargs)
+    except Exception, e:
+        if type(e) in exceptions:
+            raise ClientException()
+        else:
+            raise
+
+
+def client_exceptions(*exceptions):
+    """Decorator for manager methods that raise expected exceptions.
+    Marking a Manager method with this decorator allows the declaration
+    of expected exceptions that the RPC layer should not consider fatal,
+    and not log as if they were generated in a real error scenario. Note
+    that this will cause listed exceptions to be wrapped in a
+    ClientException, which is used internally by the RPC layer."""
+    def outer(func):
+        def inner(*args, **kwargs):
+            return catch_client_exception(exceptions, func, *args, **kwargs)
+        return inner
+    return outer
+
+
+def version_is_compatible(imp_version, version):
+    """Determine whether versions are compatible.
+
+    :param imp_version: The version implemented
+    :param version: The version requested by an incoming message.
+    """
+    version_parts = version.split('.')
+    imp_version_parts = imp_version.split('.')
+    if int(version_parts[0]) != int(imp_version_parts[0]):  # Major
+        return False
+    if int(version_parts[1]) > int(imp_version_parts[1]):  # Minor
+        return False
+    return True
+
+
+def serialize_msg(raw_msg, force_envelope=False):
+    if not _SEND_RPC_ENVELOPE and not force_envelope:
+        return raw_msg
+
+    # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+    # information about this format.
+    msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+           _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+    return msg
+
+
+def deserialize_msg(msg):
+    # NOTE(russellb): Hang on to your hats, this road is about to
+    # get a little bumpy.
+    #
+    # Robustness Principle:
+    #    "Be strict in what you send, liberal in what you accept."
+    #
+    # At this point we have to do a bit of guessing about what it
+    # is we just received.  Here is the set of possibilities:
+    #
+    # 1) We received a dict.  This could be 2 things:
+    #
+    #   a) Inspect it to see if it looks like a standard message envelope.
+    #      If so, great!
+    #
+    #   b) If it doesn't look like a standard message envelope, it could either
+    #      be a notification, or a message from before we added a message
+    #      envelope (referred to as version 1.0).
+    #      Just return the message as-is.
+    #
+    # 2) It's any other non-dict type.  Just return it and hope for the best.
+    #    This case covers return values from rpc.call() from before message
+    #    envelopes were used.  (messages to call a method were always a dict)
+
+    if not isinstance(msg, dict):
+        # See #2 above.
+        return msg
+
+    base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+    if not all(map(lambda key: key in msg, base_envelope_keys)):
+        #  See #1.b above.
+        return msg
+
+    # At this point we think we have the message envelope
+    # format we were expecting. (#1.a above)
+
+    if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+        raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+    raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+    return raw_msg
index 14dd180b688ece41138ae0c38fa1bcc742f4a5b9..de212227cb896660050ef2d449d1c9cc93df56b1 100644 (file)
@@ -41,8 +41,8 @@ server side of the API at the same time.  However, as the code stands today,
 there can be both versioned and unversioned APIs implemented in the same code
 base.
 
-
-EXAMPLES:
+EXAMPLES
+========
 
 Nova was the first project to use versioned rpc APIs.  Consider the compute rpc
 API as an example.  The client side is in nova/compute/rpcapi.py and the server
@@ -50,12 +50,13 @@ side is in nova/compute/manager.py.
 
 
 Example 1) Adding a new method.
+-------------------------------
 
 Adding a new method is a backwards compatible change.  It should be added to
 nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
 X.Y+1.  On the client side, the new method in nova/compute/rpcapi.py should
 have a specific version specified to indicate the minimum API version that must
-be implemented for the method to be supported.  For example:
+be implemented for the method to be supported.  For example::
 
     def get_host_uptime(self, ctxt, host):
         topic = _compute_topic(self.topic, ctxt, host, None)
@@ -67,10 +68,11 @@ get_host_uptime() method.
 
 
 Example 2) Adding a new parameter.
+----------------------------------
 
 Adding a new parameter to an rpc method can be made backwards compatible.  The
 RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
-The implementation of the method must not expect the parameter to be present.
+The implementation of the method must not expect the parameter to be present.::
 
     def some_remote_method(self, arg1, arg2, newarg=None):
         # The code needs to deal with newarg=None for cases
@@ -101,21 +103,6 @@ class RpcDispatcher(object):
         self.callbacks = callbacks
         super(RpcDispatcher, self).__init__()
 
-    @staticmethod
-    def _is_compatible(mversion, version):
-        """Determine whether versions are compatible.
-
-        :param mversion: The API version implemented by a callback.
-        :param version: The API version requested by an incoming message.
-        """
-        version_parts = version.split('.')
-        mversion_parts = mversion.split('.')
-        if int(version_parts[0]) != int(mversion_parts[0]):  # Major
-            return False
-        if int(version_parts[1]) > int(mversion_parts[1]):  # Minor
-            return False
-        return True
-
     def dispatch(self, ctxt, version, method, **kwargs):
         """Dispatch a message based on a requested version.
 
@@ -137,7 +124,8 @@ class RpcDispatcher(object):
                 rpc_api_version = proxyobj.RPC_API_VERSION
             else:
                 rpc_api_version = '1.0'
-            is_compatible = self._is_compatible(rpc_api_version, version)
+            is_compatible = rpc_common.version_is_compatible(rpc_api_version,
+                                                             version)
             had_compatible = had_compatible or is_compatible
             if not hasattr(proxyobj, method):
                 continue
index 49124ef625354e3548fde0b7d4092533728a2ec1..af1406615accb74f872911bf195fc05507496e1f 100644 (file)
@@ -18,11 +18,15 @@ queues.  Casts will block, but this is very useful for tests.
 """
 
 import inspect
+# NOTE(russellb): We specifically want to use json, not our own jsonutils.
+# jsonutils has some extra logic to automatically convert objects to primitive
+# types so that they can be serialized.  We want to catch all cases where
+# non-primitive types make it into this code and treat it as an error.
+import json
 import time
 
 import eventlet
 
-from quantum.openstack.common import jsonutils
 from quantum.openstack.common.rpc import common as rpc_common
 
 CONSUMERS = {}
@@ -75,6 +79,8 @@ class Consumer(object):
                     else:
                         res.append(rval)
                 done.send(res)
+            except rpc_common.ClientException as e:
+                done.send_exception(e._exc_info[1])
             except Exception as e:
                 done.send_exception(e)
 
@@ -121,7 +127,7 @@ def create_connection(conf, new=True):
 
 def check_serialize(msg):
     """Make sure a message intended for rpc can be serialized."""
-    jsonutils.dumps(msg)
+    json.dumps(msg)
 
 
 def multicall(conf, context, topic, msg, timeout=None):
@@ -154,6 +160,7 @@ def call(conf, context, topic, msg, timeout=None):
 
 
 def cast(conf, context, topic, msg):
+    check_serialize(msg)
     try:
         call(conf, context, topic, msg)
     except Exception:
index b0b292794c31ced5dda6885444031b5d9791577f..9e2620ffd72c0afce1c5ced6c51c550b1372d483 100644 (file)
@@ -162,7 +162,8 @@ class ConsumerBase(object):
         def _callback(raw_message):
             message = self.channel.message_to_python(raw_message)
             try:
-                callback(message.payload)
+                msg = rpc_common.deserialize_msg(message.payload)
+                callback(msg)
                 message.ack()
             except Exception:
                 LOG.exception(_("Failed to process message... skipping it."))
@@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
         # Default options
         options = {'durable': False,
                    'auto_delete': True,
-                   'exclusive': True}
+                   'exclusive': False}
         options.update(kwargs)
         exchange = kombu.entity.Exchange(name=msg_id,
                                          type='direct',
@@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
         options = {'durable': False,
                    'queue_arguments': _get_queue_arguments(conf),
                    'auto_delete': True,
-                   'exclusive': True}
+                   'exclusive': False}
         options.update(kwargs)
         exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
                                          durable=options['durable'],
@@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
 
         options = {'durable': False,
                    'auto_delete': True,
-                   'exclusive': True}
+                   'exclusive': False}
         options.update(kwargs)
         super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
                                               type='direct', **options)
@@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
         """
         options = {'durable': False,
                    'auto_delete': True,
-                   'exclusive': True}
+                   'exclusive': False}
         options.update(kwargs)
         super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
                                               None, type='fanout', **options)
@@ -387,6 +388,7 @@ class Connection(object):
     def __init__(self, conf, server_params=None):
         self.consumers = []
         self.consumer_thread = None
+        self.proxy_callbacks = []
         self.conf = conf
         self.max_retries = self.conf.rabbit_max_retries
         # Try forever?
@@ -469,7 +471,7 @@ class Connection(object):
             LOG.info(_("Reconnecting to AMQP server on "
                      "%(hostname)s:%(port)d") % params)
             try:
-                self.connection.close()
+                self.connection.release()
             except self.connection_errors:
                 pass
             # Setting this in case the next statement fails, though
@@ -573,12 +575,14 @@ class Connection(object):
     def close(self):
         """Close/release this connection"""
         self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
         self.connection.release()
         self.connection = None
 
     def reset(self):
         """Reset a connection so it can be used again"""
         self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
         self.channel.close()
         self.channel = self.connection.channel()
         # work around 'memory' transport bug in 1.1.3
@@ -644,6 +648,11 @@ class Connection(object):
                 pass
             self.consumer_thread = None
 
+    def wait_on_proxy_callbacks(self):
+        """Wait for all proxy callback threads to exit."""
+        for proxy_cb in self.proxy_callbacks:
+            proxy_cb.wait()
+
     def publisher_send(self, cls, topic, msg, **kwargs):
         """Send to a publisher based on the publisher class"""
 
@@ -719,6 +728,7 @@ class Connection(object):
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
 
         if fanout:
             self.declare_fanout_consumer(topic, proxy_cb)
@@ -730,6 +740,7 @@ class Connection(object):
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
         self.declare_topic_consumer(topic, proxy_cb, pool_name)
 
 
@@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
         rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
     """Sends a notification event on a topic."""
     return rpc_amqp.notify(
         conf, context, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
+        rpc_amqp.get_connection_pool(conf, Connection),
+        envelope)
 
 
 def cleanup():
index d8813bbfdf838daa58859616459c50c3e02cf860..16f21a4e58da4eeb388c08d1b04adc38f21c75f8 100644 (file)
@@ -17,7 +17,6 @@
 
 import functools
 import itertools
-import logging
 import time
 import uuid
 
@@ -29,6 +28,7 @@ import qpid.messaging.exceptions
 from quantum.openstack.common import cfg
 from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
 from quantum.openstack.common.rpc import amqp as rpc_amqp
 from quantum.openstack.common.rpc import common as rpc_common
 
@@ -41,6 +41,9 @@ qpid_opts = [
     cfg.StrOpt('qpid_port',
                default='5672',
                help='Qpid broker port'),
+    cfg.ListOpt('qpid_hosts',
+                default=['$qpid_hostname:$qpid_port'],
+                help='Qpid HA cluster host:port pairs'),
     cfg.StrOpt('qpid_username',
                default='',
                help='Username for qpid connection'),
@@ -121,7 +124,8 @@ class ConsumerBase(object):
         """Fetch the message and pass it to the callback object"""
         message = self.receiver.fetch()
         try:
-            self.callback(message.content)
+            msg = rpc_common.deserialize_msg(message.content)
+            self.callback(msg)
         except Exception:
             LOG.exception(_("Failed to process message... skipping it."))
         finally:
@@ -274,25 +278,32 @@ class Connection(object):
         self.session = None
         self.consumers = {}
         self.consumer_thread = None
+        self.proxy_callbacks = []
         self.conf = conf
 
+        if server_params and 'hostname' in server_params:
+            # NOTE(russellb) This enables support for cast_to_server.
+            server_params['qpid_hosts'] = [
+                '%s:%d' % (server_params['hostname'],
+                           server_params.get('port', 5672))
+            ]
+
         params = {
-            'hostname': self.conf.qpid_hostname,
-            'port': self.conf.qpid_port,
+            'qpid_hosts': self.conf.qpid_hosts,
             'username': self.conf.qpid_username,
             'password': self.conf.qpid_password,
         }
         params.update(server_params or {})
 
-        self.broker = params['hostname'] + ":" + str(params['port'])
+        self.brokers = params['qpid_hosts']
         self.username = params['username']
         self.password = params['password']
-        self.connection_create()
+        self.connection_create(self.brokers[0])
         self.reconnect()
 
-    def connection_create(self):
+    def connection_create(self, broker):
         # Create the connection - this does not open the connection
-        self.connection = qpid.messaging.Connection(self.broker)
+        self.connection = qpid.messaging.Connection(broker)
 
         # Check if flags are set and if so set them for the connection
         # before we call open
@@ -320,10 +331,14 @@ class Connection(object):
             except qpid.messaging.exceptions.ConnectionError:
                 pass
 
+        attempt = 0
         delay = 1
         while True:
+            broker = self.brokers[attempt % len(self.brokers)]
+            attempt += 1
+
             try:
-                self.connection_create()
+                self.connection_create(broker)
                 self.connection.open()
             except qpid.messaging.exceptions.ConnectionError, e:
                 msg_dict = dict(e=e, delay=delay)
@@ -333,10 +348,9 @@ class Connection(object):
                 time.sleep(delay)
                 delay = min(2 * delay, 60)
             else:
+                LOG.info(_('Connected to AMQP server on %s'), broker)
                 break
 
-        LOG.info(_('Connected to AMQP server on %s'), self.broker)
-
         self.session = self.connection.session()
 
         if self.consumers:
@@ -362,12 +376,14 @@ class Connection(object):
     def close(self):
         """Close/release this connection"""
         self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
         self.connection.close()
         self.connection = None
 
     def reset(self):
         """Reset a connection so it can be used again"""
         self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
         self.session.close()
         self.session = self.connection.session()
         self.consumers = {}
@@ -422,6 +438,11 @@ class Connection(object):
                 pass
             self.consumer_thread = None
 
+    def wait_on_proxy_callbacks(self):
+        """Wait for all proxy callback threads to exit."""
+        for proxy_cb in self.proxy_callbacks:
+            proxy_cb.wait()
+
     def publisher_send(self, cls, topic, msg):
         """Send to a publisher based on the publisher class"""
 
@@ -497,6 +518,7 @@ class Connection(object):
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
 
         if fanout:
             consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@@ -512,6 +534,7 @@ class Connection(object):
         proxy_cb = rpc_amqp.ProxyCallback(
             self.conf, proxy,
             rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
 
         consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
                                  name=pool_name)
@@ -570,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
         rpc_amqp.get_connection_pool(conf, Connection))
 
 
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
     """Sends a notification event on a topic."""
     return rpc_amqp.notify(conf, context, topic, msg,
-                           rpc_amqp.get_connection_pool(conf, Connection))
+                           rpc_amqp.get_connection_pool(conf, Connection),
+                           envelope)
 
 
 def cleanup():
index 02a44d21ba01c34d4361d1f70abf79b87664b597..efee8461cb8f947a7ed7312e278befeee7ecab74 100644 (file)
@@ -205,7 +205,9 @@ class ZmqClient(object):
     def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
         self.outq = ZmqSocket(addr, socket_type, bind=bind)
 
-    def cast(self, msg_id, topic, data):
+    def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+        if serialize:
+            data = rpc_common.serialize_msg(data, force_envelope)
         self.outq.send([str(msg_id), str(topic), str('cast'),
                         _serialize(data)])
 
@@ -250,7 +252,7 @@ class InternalContext(object):
         """Process a curried message and cast the result to topic."""
         LOG.debug(_("Running func with context: %s"), ctx.to_dict())
         data.setdefault('version', None)
-        data.setdefault('args', [])
+        data.setdefault('args', {})
 
         try:
             result = proxy.dispatch(
@@ -259,7 +261,14 @@ class InternalContext(object):
         except greenlet.GreenletExit:
             # ignore these since they are just from shutdowns
             pass
+        except rpc_common.ClientException, e:
+            LOG.debug(_("Expected exception during message handling (%s)") %
+                      e._exc_info[1])
+            return {'exc':
+                    rpc_common.serialize_remote_exception(e._exc_info,
+                                                          log_failure=False)}
         except Exception:
+            LOG.error(_("Exception during message handling"))
             return {'exc':
                     rpc_common.serialize_remote_exception(sys.exc_info())}
 
@@ -314,7 +323,7 @@ class ConsumerBase(object):
             return
 
         data.setdefault('version', None)
-        data.setdefault('args', [])
+        data.setdefault('args', {})
         proxy.dispatch(ctx, data['version'],
                        data['method'], **data['args'])
 
@@ -426,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor):
             sock_type = zmq.PUB
         elif topic.startswith('zmq_replies'):
             sock_type = zmq.PUB
-            inside = _deserialize(in_msg)
+            inside = rpc_common.deserialize_msg(_deserialize(in_msg))
             msg_id = inside[-1]['args']['msg_id']
             response = inside[-1]['args']['response']
             LOG.debug(_("->response->%s"), response)
@@ -473,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
 
         msg_id, topic, style, in_msg = data
 
-        ctx, request = _deserialize(in_msg)
+        ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
         ctx = RpcContext.unmarshal(ctx)
 
         proxy = self.proxies[sock]
@@ -524,7 +533,8 @@ class Connection(rpc_common.Connection):
         self.reactor.consume_in_thread()
 
 
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+          force_envelope=False):
     timeout_cast = timeout or CONF.rpc_cast_timeout
     payload = [RpcContext.marshal(context), msg]
 
@@ -533,7 +543,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
             conn = ZmqClient(addr)
 
             # assumes cast can't return an exception
-            conn.cast(msg_id, topic, payload)
+            conn.cast(msg_id, topic, payload, serialize, force_envelope)
         except zmq.ZMQError:
             raise RPCException("Cast failed. ZMQ Socket Exception")
         finally:
@@ -602,7 +612,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
     return responses[-1]
 
 
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+                force_envelope=False):
     """
     Wraps the sending of messages,
     dispatches to the matchmaker and sends
@@ -628,7 +639,8 @@ def _multi_send(method, context, topic, msg, timeout=None):
 
         if method.__name__ == '_cast':
             eventlet.spawn_n(method, _addr, context,
-                             _topic, _topic, msg, timeout)
+                             _topic, _topic, msg, timeout, serialize,
+                             force_envelope)
             return
         return method(_addr, context, _topic, _topic, msg, timeout)
 
@@ -669,6 +681,8 @@ def notify(conf, context, topic, msg, **kwargs):
     # NOTE(ewindisch): dot-priority in rpc notifier does not
     # work with our assumptions.
     topic.replace('.', '-')
+    kwargs['serialize'] = kwargs.pop('envelope')
+    kwargs['force_envelope'] = True
     cast(conf, context, topic, msg, **kwargs)
 
 
index ecb54eb8ce4dea9cf102bc8561d7dccb5c07612a..3182d37ed5faf3decbd1a7f2b69539f94f35fd74 100644 (file)
@@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
 import contextlib
 import itertools
 import json
-import logging
 
 from quantum.openstack.common import cfg
 from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
 
 
 matchmaker_opts = [
index 30ffaaec4d95740f829f636656726c95ff508519..a239d8d2ba083c9dd8751bf62a9e3f3905570eef 100644 (file)
@@ -57,6 +57,11 @@ class Service(service.Service):
 
         self.conn.create_consumer(self.topic, dispatcher, fanout=True)
 
+        # Hook to allow the manager to do other initializations after
+        # the rpc connection is created.
+        if callable(getattr(self.manager, 'initialize_service_hook', None)):
+            self.manager.initialize_service_hook(self)
+
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
 
index d967e7095b4e06d0d8c53c1db37ac318b3695d91..9e34d9ff12594b5159cdb626c19b38c0e018ec55 100644 (file)
@@ -27,7 +27,7 @@ import sys
 import time
 
 import eventlet
-import greenlet
+import extras
 import logging as std_logging
 
 from quantum.openstack.common import cfg
@@ -36,11 +36,8 @@ from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import log as logging
 from quantum.openstack.common import threadgroup
 
-try:
-    from quantum.openstack.common import rpc
-except ImportError:
-    rpc = None
 
+rpc = extras.try_import('quantum.openstack.common.rpc')
 CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
@@ -54,7 +51,7 @@ class Launcher(object):
         :returns: None
 
         """
-        self._services = []
+        self._services = threadgroup.ThreadGroup('launcher')
         eventlet_backdoor.initialize_if_enabled()
 
     @staticmethod
@@ -75,8 +72,7 @@ class Launcher(object):
         :returns: None
 
         """
-        gt = eventlet.spawn(self.run_service, service)
-        self._services.append(gt)
+        self._services.add_thread(self.run_service, service)
 
     def stop(self):
         """Stop all services which are currently running.
@@ -84,8 +80,7 @@ class Launcher(object):
         :returns: None
 
         """
-        for service in self._services:
-            service.kill()
+        self._services.stop()
 
     def wait(self):
         """Waits until all services have been stopped, and then returns.
@@ -93,11 +88,7 @@ class Launcher(object):
         :returns: None
 
         """
-        for service in self._services:
-            try:
-                service.wait()
-            except greenlet.GreenletExit:
-                pass
+        self._services.wait()
 
 
 class SignalExit(SystemExit):
@@ -132,9 +123,9 @@ class ServiceLauncher(Launcher):
         except SystemExit as exc:
             status = exc.code
         finally:
-            self.stop()
             if rpc:
                 rpc.cleanup()
+            self.stop()
         return status
 
 
@@ -252,7 +243,10 @@ class ProcessLauncher(object):
 
     def _wait_child(self):
         try:
-            pid, status = os.wait()
+            # Don't block if no child processes have exited
+            pid, status = os.waitpid(0, os.WNOHANG)
+            if not pid:
+                return None
         except OSError as exc:
             if exc.errno not in (errno.EINTR, errno.ECHILD):
                 raise
@@ -260,10 +254,12 @@ class ProcessLauncher(object):
 
         if os.WIFSIGNALED(status):
             sig = os.WTERMSIG(status)
-            LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
+            LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
+                     dict(pid=pid, sig=sig))
         else:
             code = os.WEXITSTATUS(status)
-            LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
+            LOG.info(_('Child %(pid)s exited with status %(code)d'),
+                     dict(pid=pid, code=code))
 
         if pid not in self.children:
             LOG.warning(_('pid %d not in child list'), pid)
@@ -282,6 +278,10 @@ class ProcessLauncher(object):
         while self.running:
             wrap = self._wait_child()
             if not wrap:
+                # Yield to other threads if no children have exited
+                # Sleep for a short time to avoid excessive CPU usage
+                # (see bug #1095346)
+                eventlet.greenthread.sleep(.01)
                 continue
 
             while self.running and len(wrap.children) < wrap.workers:
@@ -309,8 +309,8 @@ class ProcessLauncher(object):
 class Service(object):
     """Service object for binaries running on hosts."""
 
-    def __init__(self):
-        self.tg = threadgroup.ThreadGroup('service')
+    def __init__(self, threads=1000):
+        self.tg = threadgroup.ThreadGroup('service', threads)
 
     def start(self):
         pass
index 83eef07a7be54ea67c3975fa1d7f050de23dcf87..ec37a7f83a08d90f8a01c1b60835d32c4f1365f3 100644 (file)
@@ -117,8 +117,12 @@ def write_requirements():
 
 
 def _run_shell_command(cmd):
-    output = subprocess.Popen(["/bin/sh", "-c", cmd],
-                              stdout=subprocess.PIPE)
+    if os.name == 'nt':
+        output = subprocess.Popen(["cmd.exe", "/C", cmd],
+                                  stdout=subprocess.PIPE)
+    else:
+        output = subprocess.Popen(["/bin/sh", "-c", cmd],
+                                  stdout=subprocess.PIPE)
     out = output.communicate()
     if len(out) == 0:
         return None
@@ -272,6 +276,9 @@ def get_cmdclass():
         from sphinx.setup_command import BuildDoc
 
         class LocalBuildDoc(BuildDoc):
+
+            builders = ['html', 'man']
+
             def generate_autoindex(self):
                 print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
                 modules = {}
@@ -307,14 +314,19 @@ def get_cmdclass():
                 if not os.getenv('SPHINX_DEBUG'):
                     self.generate_autoindex()
 
-                for builder in ['html', 'man']:
+                for builder in self.builders:
                     self.builder = builder
                     self.finalize_options()
                     self.project = self.distribution.get_name()
                     self.version = self.distribution.get_version()
                     self.release = self.distribution.get_version()
                     BuildDoc.run(self)
+
+        class LocalBuildLatex(LocalBuildDoc):
+            builders = ['latex']
+
         cmdclass['build_sphinx'] = LocalBuildDoc
+        cmdclass['build_sphinx_latex'] = LocalBuildLatex
     except ImportError:
         pass
 
index 0aea30ff62731d57e29fbc80abe8e77eea29f174..d1e12715e8a1b788e9fa7fb49085857b451971f4 100644 (file)
@@ -18,7 +18,6 @@ from eventlet import greenlet
 from eventlet import greenpool
 from eventlet import greenthread
 
-from quantum.openstack.common.gettextutils import _
 from quantum.openstack.common import log as logging
 from quantum.openstack.common import loopingcall
 
@@ -27,19 +26,17 @@ LOG = logging.getLogger(__name__)
 
 
 def _thread_done(gt, *args, **kwargs):
-    '''
-    Callback function to be passed to GreenThread.link() when we spawn()
-    Calls the ThreadGroup to notify if.
-    '''
+    """ Callback function to be passed to GreenThread.link() when we spawn()
+    Calls the :class:`ThreadGroup` to notify if.
+
+    """
     kwargs['group'].thread_done(kwargs['thread'])
 
 
 class Thread(object):
-    """
-    Wrapper around a greenthread, that holds a reference to
-    the ThreadGroup. The Thread will notify the ThreadGroup
-    when it has done so it can be removed from the threads
-    list.
+    """ Wrapper around a greenthread, that holds a reference to the
+    :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
+    it has done so it can be removed from the threads list.
     """
     def __init__(self, name, thread, group):
         self.name = name
@@ -54,11 +51,11 @@ class Thread(object):
 
 
 class ThreadGroup(object):
-    """
-    The point of this class is to:
-    - keep track of timers and greenthreads (making it easier to stop them
+    """ The point of the ThreadGroup classis to:
+
+    * keep track of timers and greenthreads (making it easier to stop them
       when need be).
-    - provide an easy API to add timers.
+    * provide an easy API to add timers.
     """
     def __init__(self, name, thread_pool_size=10):
         self.name = name
index 86004391de06a7144b6dce653344880bc93ccbf0..0f346087f78dbeae8f329efbfc9731c155f2d8ad 100644 (file)
@@ -71,11 +71,15 @@ def normalize_time(timestamp):
 
 def is_older_than(before, seconds):
     """Return True if before is older than seconds."""
+    if isinstance(before, basestring):
+        before = parse_strtime(before).replace(tzinfo=None)
     return utcnow() - before > datetime.timedelta(seconds=seconds)
 
 
 def is_newer_than(after, seconds):
     """Return True if after is newer than seconds."""
+    if isinstance(after, basestring):
+        after = parse_strtime(after).replace(tzinfo=None)
     return after - utcnow() > datetime.timedelta(seconds=seconds)
 
 
@@ -87,7 +91,10 @@ def utcnow_ts():
 def utcnow():
     """Overridable version of utils.utcnow."""
     if utcnow.override_time:
-        return utcnow.override_time
+        try:
+            return utcnow.override_time.pop(0)
+        except AttributeError:
+            return utcnow.override_time
     return datetime.datetime.utcnow()
 
 
@@ -95,14 +102,21 @@ utcnow.override_time = None
 
 
 def set_time_override(override_time=datetime.datetime.utcnow()):
-    """Override utils.utcnow to return a constant time."""
+    """
+    Override utils.utcnow to return a constant time or a list thereof,
+    one at a time.
+    """
     utcnow.override_time = override_time
 
 
 def advance_time_delta(timedelta):
     """Advance overridden time using a datetime.timedelta."""
     assert(not utcnow.override_time is None)
-    utcnow.override_time += timedelta
+    try:
+        for dt in utcnow.override_time:
+            dt += timedelta
+    except TypeError:
+        utcnow.override_time += timedelta
 
 
 def advance_time_seconds(seconds):
@@ -135,3 +149,16 @@ def unmarshall_time(tyme):
                              minute=tyme['minute'],
                              second=tyme['second'],
                              microsecond=tyme['microsecond'])
+
+
+def delta_seconds(before, after):
+    """
+    Compute the difference in seconds between two date, time, or
+    datetime objects (as a float, to microsecond resolution).
+    """
+    delta = after - before
+    try:
+        return delta.total_seconds()
+    except AttributeError:
+        return ((delta.days * 24 * 3600) + delta.seconds +
+                float(delta.microseconds) / (10 ** 6))
index a19e4226528d7ee1c47a664d76780a897838edc8..c04a695ff4fbab70da440d50453393cbe9cd6cc8 100644 (file)
@@ -24,19 +24,6 @@ import pkg_resources
 import setup
 
 
-class _deferred_version_string(object):
-    """Internal helper class which provides delayed version calculation."""
-    def __init__(self, version_info, prefix):
-        self.version_info = version_info
-        self.prefix = prefix
-
-    def __str__(self):
-        return "%s%s" % (self.prefix, self.version_info.version_string())
-
-    def __repr__(self):
-        return "%s%s" % (self.prefix, self.version_info.version_string())
-
-
 class VersionInfo(object):
 
     def __init__(self, package, python_package=None, pre_version=None):
@@ -57,14 +44,15 @@ class VersionInfo(object):
             self.python_package = python_package
         self.pre_version = pre_version
         self.version = None
+        self._cached_version = None
 
     def _generate_version(self):
         """Defer to the openstack.common.setup routines for making a
         version from git."""
         if self.pre_version is None:
-            return setup.get_post_version(self.python_package)
+            return setup.get_post_version(self.package)
         else:
-            return setup.get_pre_version(self.python_package, self.pre_version)
+            return setup.get_pre_version(self.package, self.pre_version)
 
     def _newer_version(self, pending_version):
         """Check to see if we're working with a stale version or not.
@@ -138,11 +126,14 @@ class VersionInfo(object):
         else:
             return '%s-dev' % (version_parts[0],)
 
-    def deferred_version_string(self, prefix=""):
+    def cached_version_string(self, prefix=""):
         """Generate an object which will expand in a string context to
         the results of version_string(). We do this so that don't
         call into pkg_resources every time we start up a program when
         passing version information into the CONF constructor, but
         rather only do the calculation when and if a version is requested
         """
-        return _deferred_version_string(self, prefix)
+        if not self._cached_version:
+            self._cached_version = "%s%s" % (prefix,
+                                             self.version_string())
+        return self._cached_version
index ff31a51410f12631fdaae2c7806811ef7b8dedc7..331b5beb9b060bbe2c2f7189a21c4cd35eff83ec 100644 (file)
@@ -5,6 +5,7 @@ amqplib==0.6.1
 anyjson>=0.2.4
 argparse
 eventlet>=0.9.17
+extras
 greenlet>=0.3.1
 httplib2
 iso8601>=0.1.4