]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Update openstack-common
authorAngus Salkeld <asalkeld@redhat.com>
Thu, 13 Sep 2012 03:50:24 +0000 (13:50 +1000)
committerAngus Salkeld <asalkeld@redhat.com>
Thu, 13 Sep 2012 03:50:24 +0000 (13:50 +1000)
Change-Id: I002574a60b4f59543bc6aa73256c2f0b3b79d378
Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
19 files changed:
heat/openstack/common/cfg.py
heat/openstack/common/exception.py
heat/openstack/common/excutils.py
heat/openstack/common/importutils.py
heat/openstack/common/jsonutils.py
heat/openstack/common/log.py
heat/openstack/common/notifier/api.py
heat/openstack/common/notifier/log_notifier.py
heat/openstack/common/notifier/rabbit_notifier.py
heat/openstack/common/rpc/__init__.py
heat/openstack/common/rpc/amqp.py
heat/openstack/common/rpc/common.py
heat/openstack/common/rpc/impl_kombu.py
heat/openstack/common/rpc/impl_qpid.py
heat/openstack/common/rpc/impl_zmq.py
heat/openstack/common/rpc/proxy.py
heat/openstack/common/setup.py
heat/openstack/common/timeutils.py
heat/openstack/common/utils.py

index 6785766cb4679978f68397eadfdbc064eeb89a1d..53ed9ce2128b8d89defe085e314863ee9522b0c7 100644 (file)
@@ -367,6 +367,11 @@ class ConfigFileValueError(Error):
     pass
 
 
+def _fixpath(p):
+    """Apply tilde expansion and absolutization to a path."""
+    return os.path.abspath(os.path.expanduser(p))
+
+
 def _get_config_dirs(project=None):
     """Return a list of directors where config files may be located.
 
@@ -384,11 +389,9 @@ def _get_config_dirs(project=None):
       ~/
       /etc/
     """
-    fix_path = lambda p: os.path.abspath(os.path.expanduser(p))
-
     cfg_dirs = [
-        fix_path(os.path.join('~', '.' + project)) if project else None,
-        fix_path('~'),
+        _fixpath(os.path.join('~', '.' + project)) if project else None,
+        _fixpath('~'),
         os.path.join('/etc', project) if project else None,
         '/etc'
     ]
@@ -464,7 +467,7 @@ def _is_opt_registered(opts, opt):
     :raises: DuplicateOptError if a naming conflict is detected
     """
     if opt.dest in opts:
-        if opts[opt.dest]['opt'] is not opt:
+        if opts[opt.dest]['opt'] != opt:
             raise DuplicateOptError(opt.name)
         return True
     else:
@@ -527,6 +530,9 @@ class Opt(object):
         else:
             self.deprecated_name = None
 
+    def __ne__(self, another):
+        return vars(self) != vars(another)
+
     def _get_from_config_parser(self, cparser, section):
         """Retrieves the option value from a MultiConfigParser object.
 
@@ -806,7 +812,7 @@ class OptGroup(object):
         if _is_opt_registered(self._opts, opt):
             return False
 
-        self._opts[opt.dest] = {'opt': opt, 'override': None, 'default': None}
+        self._opts[opt.dest] = {'opt': opt}
 
         return True
 
@@ -1084,7 +1090,7 @@ class ConfigOpts(collections.Mapping):
         if _is_opt_registered(self._opts, opt):
             return False
 
-        self._opts[opt.dest] = {'opt': opt, 'override': None, 'default': None}
+        self._opts[opt.dest] = {'opt': opt}
 
         return True
 
@@ -1153,6 +1159,25 @@ class ConfigOpts(collections.Mapping):
         for opt in opts:
             self.unregister_opt(opt, group, clear_cache=False)
 
+    def import_opt(self, name, module_str, group=None):
+        """Import an option definition from a module.
+
+        Import a module and check that a given option is registered.
+
+        This is intended for use with global configuration objects
+        like cfg.CONF where modules commonly register options with
+        CONF at module load time. If one module requires an option
+        defined by another module it can use this method to explicitly
+        declare the dependency.
+
+        :param name: the name/dest of the opt
+        :param module_str: the name of a module to import
+        :param group: an option OptGroup object or group name
+        :raises: NoSuchOptError, NoSuchGroupError
+        """
+        __import__(module_str)
+        self._get_opt_info(name, group)
+
     @__clear_cache
     def set_override(self, name, override, group=None):
         """Override an opt value.
@@ -1183,6 +1208,33 @@ class ConfigOpts(collections.Mapping):
         opt_info = self._get_opt_info(name, group)
         opt_info['default'] = default
 
+    @__clear_cache
+    def clear_override(self, name, group=None):
+        """Clear an override an opt value.
+
+        Clear a previously set override of the command line, config file
+        and default values of a given option.
+
+        :param name: the name/dest of the opt
+        :param group: an option OptGroup object or group name
+        :raises: NoSuchOptError, NoSuchGroupError
+        """
+        opt_info = self._get_opt_info(name, group)
+        opt_info.pop('override', None)
+
+    @__clear_cache
+    def clear_default(self, name, group=None):
+        """Clear an override an opt's default value.
+
+        Clear a previously set override of the default value of given option.
+
+        :param name: the name/dest of the opt
+        :param group: an option OptGroup object or group name
+        :raises: NoSuchOptError, NoSuchGroupError
+        """
+        opt_info = self._get_opt_info(name, group)
+        opt_info.pop('default', None)
+
     def _all_opt_infos(self):
         """A generator function for iteration opt infos."""
         for info in self._opts.values():
@@ -1199,8 +1251,8 @@ class ConfigOpts(collections.Mapping):
     def _unset_defaults_and_overrides(self):
         """Unset any default or override on all options."""
         for info, group in self._all_opt_infos():
-            info['default'] = None
-            info['override'] = None
+            info.pop('default', None)
+            info.pop('override', None)
 
     def disable_interspersed_args(self):
         """Set parsing to stop on the first non-option.
@@ -1246,10 +1298,10 @@ class ConfigOpts(collections.Mapping):
         """
         dirs = []
         if self.config_dir:
-            dirs.append(self.config_dir)
+            dirs.append(_fixpath(self.config_dir))
 
         for cf in reversed(self.config_file):
-            dirs.append(os.path.dirname(cf))
+            dirs.append(os.path.dirname(_fixpath(cf)))
 
         dirs.extend(_get_config_dirs(self.project))
 
@@ -1323,10 +1375,10 @@ class ConfigOpts(collections.Mapping):
             return self.GroupAttr(self, self._get_group(name))
 
         info = self._get_opt_info(name, group)
-        default, opt, override = [info[k] for k in sorted(info.keys())]
+        opt = info['opt']
 
-        if override is not None:
-            return override
+        if 'override' in info:
+            return info['override']
 
         values = []
         if self._cparser is not None:
@@ -1354,8 +1406,8 @@ class ConfigOpts(collections.Mapping):
         if values:
             return values
 
-        if default is not None:
-            return default
+        if 'default' in info:
+            return info['default']
 
         return opt.default
 
@@ -1430,6 +1482,8 @@ class ConfigOpts(collections.Mapping):
             config_dir_glob = os.path.join(self.config_dir, '*.conf')
             config_files += sorted(glob.glob(config_dir_glob))
 
+        config_files = [_fixpath(p) for p in config_files]
+
         self._cparser = MultiConfigParser()
 
         try:
@@ -1447,10 +1501,10 @@ class ConfigOpts(collections.Mapping):
         :raises: RequiredOptError
         """
         for info, group in self._all_opt_infos():
-            default, opt, override = [info[k] for k in sorted(info.keys())]
+            opt = info['opt']
 
             if opt.required:
-                if (default is not None or override is not None):
+                if ('default' in info or 'override' in info):
                     continue
 
                 if self._get(opt.name, group) is None:
index e5da94b9496518923bf69507a22065589e7f4bfa..ba32da550b01a1b353becc043b544fecde474eb7 100644 (file)
@@ -19,7 +19,6 @@
 Exceptions common to OpenStack projects
 """
 
-import itertools
 import logging
 
 
index 67c9fa951184c2ac5a79d3caf4f9c0eab80938e4..5dd48301760e4778904bdc32f2d748e0e9ef7980 100644 (file)
@@ -30,14 +30,14 @@ def save_and_reraise_exception():
     """Save current exception, run some code and then re-raise.
 
     In some cases the exception context can be cleared, resulting in None
-    being attempted to be reraised after an exception handler is run. This
+    being attempted to be re-raised after an exception handler is run. This
     can happen when eventlet switches greenthreads or when running an
     exception handler, code raises and catches an exception. In both
     cases the exception context will be cleared.
 
     To work around this, we save the exception state, run handler code, and
     then re-raise the original exception. If another exception occurs, the
-    saved exception is logged and the new exception is reraised.
+    saved exception is logged and the new exception is re-raised.
     """
     type_, value, tb = sys.exc_info()
     try:
index 2fbb0291a06d97182e6c1717da125fe3888623a6..f45372b4dba607251b5a79795be42d94b94da8f9 100644 (file)
@@ -29,7 +29,7 @@ def import_class(import_str):
     try:
         __import__(mod_str)
         return getattr(sys.modules[mod_str], class_str)
-    except (ImportError, ValueError, AttributeError), exc:
+    except (ValueError, AttributeError), exc:
         raise ImportError('Class %s cannot be found (%s)' %
                           (class_str,
                            traceback.format_exception(*sys.exc_info())))
index 845023eba89088090b4c4beee626699dc9f79481..2566fd113354a39dd6d6f0b9013ec37ad67cda9f 100644 (file)
@@ -107,9 +107,11 @@ def to_primitive(value, convert_instances=False, level=0):
         elif hasattr(value, 'iteritems'):
             return to_primitive(dict(value.iteritems()),
                                 convert_instances=convert_instances,
-                                level=level)
+                                level=level + 1)
         elif hasattr(value, '__iter__'):
-            return to_primitive(list(value), level)
+            return to_primitive(list(value),
+                                convert_instances=convert_instances,
+                                level=level)
         elif convert_instances and hasattr(value, '__dict__'):
             # Likely an instance of something. Watch for cycles.
             # Ignore class member vars.
index e4c808fd182eda628eae82fad0fb8230bd484409..41e3781ce8b62a667fb2cd56bb3aaeb7042f7432 100644 (file)
@@ -54,25 +54,24 @@ log_opts = [
                        '%(message)s',
                help='format string to use for log messages with context'),
     cfg.StrOpt('logging_default_format_string',
-               default='%(asctime)s %(levelname)s %(name)s [-] %(instance)s'
-                       '%(message)s',
+               default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
+                       ' %(instance)s%(message)s',
                help='format string to use for log messages without context'),
     cfg.StrOpt('logging_debug_format_suffix',
-               default='from (pid=%(process)d) %(funcName)s '
-                       '%(pathname)s:%(lineno)d',
+               default='%(funcName)s %(pathname)s:%(lineno)d',
                help='data to append to log format when level is DEBUG'),
     cfg.StrOpt('logging_exception_prefix',
-               default='%(asctime)s TRACE %(name)s %(instance)s',
+               default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
                help='prefix each line of exception output with this format'),
     cfg.ListOpt('default_log_levels',
                 default=[
-                  'amqplib=WARN',
-                  'sqlalchemy=WARN',
-                  'boto=WARN',
-                  'suds=INFO',
-                  'keystone=INFO',
-                  'eventlet.wsgi.server=WARN'
-                  ],
+                    'amqplib=WARN',
+                    'sqlalchemy=WARN',
+                    'boto=WARN',
+                    'suds=INFO',
+                    'keystone=INFO',
+                    'eventlet.wsgi.server=WARN'
+                ],
                 help='list of logger=LEVEL pairs'),
     cfg.BoolOpt('publish_errors',
                 default=False,
@@ -89,7 +88,7 @@ log_opts = [
                default='[instance: %(uuid)s] ',
                help='If an instance UUID is passed with the log message, '
                     'format it like this'),
-    ]
+]
 
 
 generic_log_opts = [
@@ -105,7 +104,7 @@ generic_log_opts = [
     cfg.StrOpt('logfile_mode',
                default='0644',
                help='Default file mode used when creating log files'),
-    ]
+]
 
 
 CONF = cfg.CONF
@@ -208,9 +207,9 @@ class JSONFormatter(logging.Formatter):
     def formatException(self, ei, strip_newlines=True):
         lines = traceback.format_exception(*ei)
         if strip_newlines:
-            lines = [itertools.ifilter(lambda x: x,
-                                      line.rstrip().splitlines())
-                    for line in lines]
+            lines = [itertools.ifilter(
+                lambda x: x,
+                line.rstrip().splitlines()) for line in lines]
             lines = list(itertools.chain(*lines))
         return lines
 
@@ -247,26 +246,27 @@ class JSONFormatter(logging.Formatter):
 
 class PublishErrorsHandler(logging.Handler):
     def emit(self, record):
-        if 'list_notifier_drivers' in CONF:
-            if ('heat.openstack.common.notifier.log_notifier' in
-                CONF.list_notifier_drivers):
-                return
+        if ('heat.openstack.common.notifier.log_notifier' in
+            CONF.notification_driver):
+            return
         notifier.api.notify(None, 'error.publisher',
-                                 'error_notification',
-                                 notifier.api.ERROR,
-                                 dict(error=record.msg))
+                            'error_notification',
+                            notifier.api.ERROR,
+                            dict(error=record.msg))
 
 
-def handle_exception(type, value, tb):
-    extra = {}
-    if CONF.verbose:
-        extra['exc_info'] = (type, value, tb)
-    getLogger().critical(str(value), **extra)
+def _create_logging_excepthook(product_name):
+    def logging_excepthook(type, value, tb):
+        extra = {}
+        if CONF.verbose:
+            extra['exc_info'] = (type, value, tb)
+        getLogger(product_name).critical(str(value), **extra)
+    return logging_excepthook
 
 
 def setup(product_name):
     """Setup logging."""
-    sys.excepthook = handle_exception
+    sys.excepthook = _create_logging_excepthook(product_name)
 
     if CONF.log_config:
         try:
@@ -357,17 +357,6 @@ def _setup_logging_from_conf(product_name):
         for handler in log_root.handlers:
             logger.addHandler(handler)
 
-    # NOTE(jkoelker) Clear the handlers for the root logger that was setup
-    #                by basicConfig in nova/__init__.py and install the
-    #                NullHandler.
-    root = logging.getLogger()
-    for handler in root.handlers:
-        root.removeHandler(handler)
-    handler = NullHandler()
-    handler.setFormatter(logging.Formatter())
-    root.addHandler(handler)
-
-
 _loggers = {}
 
 
@@ -405,8 +394,12 @@ class LegacyFormatter(logging.Formatter):
 
     def format(self, record):
         """Uses contextstring if request_id is set, otherwise default."""
-        if 'instance' not in record.__dict__:
-            record.__dict__['instance'] = ''
+        # NOTE(sdague): default the fancier formating params
+        # to an empty string so we don't throw an exception if
+        # they get used
+        for key in ('instance', 'color'):
+            if key not in record.__dict__:
+                record.__dict__[key] = ''
 
         if record.__dict__.get('request_id', None):
             self._fmt = CONF.logging_context_format_string
index b18fff770fd51baa7e9fc4fc15b6dcde9770005d..e6383f610ecf5e6044b1fcc1574465ae4ad92306 100644 (file)
@@ -13,7 +13,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import inspect
 import uuid
 
 from heat.openstack.common import cfg
@@ -28,16 +27,17 @@ from heat.openstack.common import timeutils
 LOG = logging.getLogger(__name__)
 
 notifier_opts = [
-    cfg.StrOpt('notification_driver',
-               default='heat.openstack.common.notifier.no_op_notifier',
-               help='Default driver for sending notifications'),
+    cfg.MultiStrOpt('notification_driver',
+                    default=[],
+                    deprecated_name='list_notifier_drivers',
+                    help='Driver or drivers to handle sending notifications'),
     cfg.StrOpt('default_notification_level',
                default='INFO',
                help='Default notification level for outgoing notifications'),
     cfg.StrOpt('default_publisher_id',
                default='$host',
                help='Default publisher_id for outgoing notifications'),
-    ]
+]
 
 CONF = cfg.CONF
 CONF.register_opts(notifier_opts)
@@ -122,21 +122,60 @@ def notify(context, publisher_id, event_type, priority, payload):
     """
     if priority not in log_levels:
         raise BadPriorityException(
-                 _('%s not in valid priorities') % priority)
+            _('%s not in valid priorities') % priority)
 
     # Ensure everything is JSON serializable.
     payload = jsonutils.to_primitive(payload, convert_instances=True)
 
-    driver = importutils.import_module(CONF.notification_driver)
     msg = dict(message_id=str(uuid.uuid4()),
-                   publisher_id=publisher_id,
-                   event_type=event_type,
-                   priority=priority,
-                   payload=payload,
-                   timestamp=str(timeutils.utcnow()))
-    try:
-        driver.notify(context, msg)
-    except Exception, e:
-        LOG.exception(_("Problem '%(e)s' attempting to "
-                        "send to notification system. Payload=%(payload)s") %
-                        locals())
+               publisher_id=publisher_id,
+               event_type=event_type,
+               priority=priority,
+               payload=payload,
+               timestamp=str(timeutils.utcnow()))
+
+    for driver in _get_drivers():
+        try:
+            driver.notify(context, msg)
+        except Exception, e:
+            LOG.exception(_("Problem '%(e)s' attempting to "
+              "send to notification system. Payload=%(payload)s") %
+                            locals())
+
+
+_drivers = None
+
+
+def _get_drivers():
+    """Instantiate, cache, and return drivers based on the CONF."""
+    global _drivers
+    if _drivers is None:
+        _drivers = {}
+        for notification_driver in CONF.notification_driver:
+            add_driver(notification_driver)
+
+    return _drivers.values()
+
+
+def add_driver(notification_driver):
+    """Add a notification driver at runtime."""
+    # Make sure the driver list is initialized.
+    _get_drivers()
+    if isinstance(notification_driver, basestring):
+        # Load and add
+        try:
+            driver = importutils.import_module(notification_driver)
+            _drivers[notification_driver] = driver
+        except ImportError as e:
+            LOG.exception(_("Failed to load notifier %s. "
+                            "These notifications will not be sent.") %
+                            notification_driver)
+    else:
+        # Driver is already loaded; just add the object.
+        _drivers[notification_driver] = notification_driver
+
+
+def _reset_drivers():
+    """Used by unit tests to reset the drivers."""
+    global _drivers
+    _drivers = None
index 0d6090d10b9ad87991dcf7cb7d29a020138ce9d8..ef1e65fb9835ca29fe3211263c662ef75d12e28c 100644 (file)
@@ -30,6 +30,6 @@ def notify(_context, message):
                            CONF.default_notification_level)
     priority = priority.lower()
     logger = logging.getLogger(
-            'heat.openstack.common.notification.%s' %
-            message['event_type'])
+        'heat.openstack.common.notification.%s' %
+        message['event_type'])
     getattr(logger, priority)(jsonutils.dumps(message))
index 64b00c006b523b1f0bde1ddc252d930d1480efa7..2ddb32156462c69747a8a6c0c4e25230f86a1bc0 100644 (file)
@@ -22,9 +22,9 @@ from heat.openstack.common import rpc
 
 LOG = logging.getLogger(__name__)
 
-notification_topic_opt = cfg.ListOpt('notification_topics',
-        default=['notifications', ],
-        help='AMQP topic used for openstack notifications')
+notification_topic_opt = cfg.ListOpt(
+    'notification_topics', default=['notifications', ],
+    help='AMQP topic used for openstack notifications')
 
 CONF = cfg.CONF
 CONF.register_opt(notification_topic_opt)
index 28a44c04dbcdbb7fe11783765cd3fabdd78366cc..2eb5f0a7da6057725fb289f63e0fc5ce07323bfa 100644 (file)
@@ -49,15 +49,21 @@ rpc_opts = [
     cfg.ListOpt('allowed_rpc_exception_modules',
                 default=['heat.openstack.common.exception',
                          'nova.exception',
+                         'cinder.exception',
                          ],
                 help='Modules of exceptions that are permitted to be recreated'
                      'upon receiving exception data from an rpc call.'),
-    cfg.StrOpt('control_exchange',
-               default='nova',
-               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
     cfg.BoolOpt('fake_rabbit',
                 default=False,
                 help='If passed, use a fake RabbitMQ provider'),
+    #
+    # The following options are not registered here, but are expected to be
+    # present. The project using this library must register these options with
+    # the configuration so that project-specific defaults may be defined.
+    #
+    #cfg.StrOpt('control_exchange',
+    #           default='nova',
+    #           help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
 ]
 
 cfg.CONF.register_opts(rpc_opts)
index 25df5f99c43f079f2302b2cad9bb3de618ac4a06..2374a13be886c7b6ebd482352be756b14ab3647e 100644 (file)
@@ -34,6 +34,7 @@ from eventlet import greenpool
 from eventlet import pools
 from eventlet import semaphore
 
+from heat.openstack.common import cfg
 from heat.openstack.common import excutils
 from heat.openstack.common.gettextutils import _
 from heat.openstack.common import local
@@ -416,3 +417,10 @@ def notify(conf, context, topic, msg, connection_pool):
 def cleanup(connection_pool):
     if connection_pool:
         connection_pool.empty()
+
+
+def get_control_exchange(conf):
+    try:
+        return conf.control_exchange
+    except cfg.NoSuchOptError:
+        return 'openstack'
index e559c6f1fa3194fd12fe92c5b7a89dfc5ed9caed..7ff4e3170e62b6d5dcf5fdf708520049e53e5b43 100644 (file)
 
 import copy
 import logging
-import sys
 import traceback
 
-from heat.openstack.common import cfg
 from heat.openstack.common.gettextutils import _
 from heat.openstack.common import importutils
 from heat.openstack.common import jsonutils
@@ -108,7 +106,7 @@ class Connection(object):
         """
         raise NotImplementedError()
 
-    def create_consumer(self, conf, topic, proxy, fanout=False):
+    def create_consumer(self, topic, proxy, fanout=False):
         """Create a consumer on this connection.
 
         A consumer is associated with a message queue on the backend message
@@ -117,7 +115,6 @@ class Connection(object):
         off of the queue will determine which method gets called on the proxy
         object.
 
-        :param conf:  An openstack.common.cfg configuration object.
         :param topic: This is a name associated with what to consume from.
                       Multiple instances of a service may consume from the same
                       topic. For example, all instances of nova-compute consume
@@ -133,7 +130,7 @@ class Connection(object):
         """
         raise NotImplementedError()
 
-    def create_worker(self, conf, topic, proxy, pool_name):
+    def create_worker(self, topic, proxy, pool_name):
         """Create a worker on this connection.
 
         A worker is like a regular consumer of messages directed to a
@@ -143,7 +140,6 @@ class Connection(object):
         be asked to process it. Load is distributed across the members
         of the pool in round-robin fashion.
 
-        :param conf:  An openstack.common.cfg configuration object.
         :param topic: This is a name associated with what to consume from.
                       Multiple instances of a service may consume from the same
                       topic.
index 20003d77e4d1b07cd5311a1a99b183998991d612..b5209a407f57a696928c85bda24adba488aaaa98 100644 (file)
@@ -210,10 +210,10 @@ class TopicConsumer(ConsumerBase):
                    'auto_delete': False,
                    'exclusive': False}
         options.update(kwargs)
-        exchange = kombu.entity.Exchange(name=conf.control_exchange,
-                                         type='topic',
-                                         durable=options['durable'],
-                                         auto_delete=options['auto_delete'])
+        exchange = kombu.entity.Exchange(
+                name=rpc_amqp.get_control_exchange(conf),
+                type='topic', durable=options['durable'],
+                auto_delete=options['auto_delete'])
         super(TopicConsumer, self).__init__(channel,
                                             callback,
                                             tag,
@@ -307,8 +307,9 @@ class TopicPublisher(Publisher):
                    'auto_delete': False,
                    'exclusive': False}
         options.update(kwargs)
-        super(TopicPublisher, self).__init__(channel, conf.control_exchange,
-                                             topic, type='topic', **options)
+        super(TopicPublisher, self).__init__(channel,
+                rpc_amqp.get_control_exchange(conf), topic,
+                type='topic', **options)
 
 
 class FanoutPublisher(Publisher):
index d4f4cc77e9ff010443a8b39770be5c97e84a6de3..697f4d8c5d129f9f6d20a426108e7af0081b2c5a 100644 (file)
@@ -181,9 +181,8 @@ class TopicConsumer(ConsumerBase):
         """
 
         super(TopicConsumer, self).__init__(session, callback,
-                                            "%s/%s" % (conf.control_exchange,
-                                                       topic),
-                                            {}, name or topic, {})
+                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
+                {}, name or topic, {})
 
 
 class FanoutConsumer(ConsumerBase):
@@ -256,9 +255,8 @@ class TopicPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
-        super(TopicPublisher, self).__init__(
-            session,
-            "%s/%s" % (conf.control_exchange, topic))
+        super(TopicPublisher, self).__init__(session,
+                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic))
 
 
 class FanoutPublisher(Publisher):
@@ -276,10 +274,9 @@ class NotifyPublisher(Publisher):
     def __init__(self, conf, session, topic):
         """init a 'topic' publisher.
         """
-        super(NotifyPublisher, self).__init__(
-            session,
-            "%s/%s" % (conf.control_exchange, topic),
-            {"durable": True})
+        super(NotifyPublisher, self).__init__(session,
+                "%s/%s" % (rpc_amqp.get_control_exchange(conf), topic),
+                {"durable": True})
 
 
 class Connection(object):
@@ -329,7 +326,7 @@ class Connection(object):
         if self.conf.qpid_reconnect_interval:
             self.connection.reconnect_interval = (
                 self.conf.qpid_reconnect_interval)
-        self.connection.hearbeat = self.conf.qpid_heartbeat
+        self.connection.heartbeat = self.conf.qpid_heartbeat
         self.connection.protocol = self.conf.qpid_protocol
         self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
 
index 00296dd33660af521626f633812717f1798fe6a7..476e383d5141b4fa3ad350cca8e7bf74ac64a2c6 100644 (file)
@@ -52,7 +52,7 @@ zmq_opts = [
         default=('heat.openstack.common.rpc.'
                  'matchmaker.MatchMakerLocalhost'),
         help='MatchMaker driver',
-        ),
+    ),
 
     # The following port is unassigned by IANA as of 2012-05-21
     cfg.IntOpt('rpc_zmq_port', default=9501,
@@ -72,7 +72,7 @@ zmq_opts = [
 
 # These globals are defined in register_opts(conf),
 # a mandatory initialization call
-FLAGS = None
+CONF = None
 ZMQ_CTX = None  # ZeroMQ Context, must be global.
 matchmaker = None  # memoized matchmaker object
 
@@ -274,7 +274,7 @@ class InternalContext(object):
             ctx.replies)
 
         LOG.debug(_("Sending reply"))
-        cast(FLAGS, ctx, topic, {
+        cast(CONF, ctx, topic, {
             'method': '-process_reply',
             'args': {
                 'msg_id': msg_id,
@@ -329,7 +329,6 @@ class ZmqBaseReactor(ConsumerBase):
     def __init__(self, conf):
         super(ZmqBaseReactor, self).__init__()
 
-        self.conf = conf
         self.mapping = {}
         self.proxies = {}
         self.threads = []
@@ -405,7 +404,7 @@ class ZmqProxy(ZmqBaseReactor):
         super(ZmqProxy, self).__init__(conf)
 
         self.topic_proxy = {}
-        ipc_dir = conf.rpc_zmq_ipc_dir
+        ipc_dir = CONF.rpc_zmq_ipc_dir
 
         self.topic_proxy['zmq_replies'] = \
             ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
@@ -413,7 +412,7 @@ class ZmqProxy(ZmqBaseReactor):
         self.sockets.append(self.topic_proxy['zmq_replies'])
 
     def consume(self, sock):
-        ipc_dir = self.conf.rpc_zmq_ipc_dir
+        ipc_dir = CONF.rpc_zmq_ipc_dir
 
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
@@ -487,7 +486,6 @@ class Connection(rpc_common.Connection):
     """Manages connections and threads."""
 
     def __init__(self, conf):
-        self.conf = conf
         self.reactor = ZmqReactor(conf)
 
     def create_consumer(self, topic, proxy, fanout=False):
@@ -508,7 +506,7 @@ class Connection(rpc_common.Connection):
 
         # Receive messages from (local) proxy
         inaddr = "ipc://%s/zmq_topic_%s" % \
-            (self.conf.rpc_zmq_ipc_dir, topic)
+            (CONF.rpc_zmq_ipc_dir, topic)
 
         LOG.debug(_("Consumer is a zmq.%s"),
                   ['PULL', 'SUB'][sock_type == zmq.SUB])
@@ -527,7 +525,7 @@ class Connection(rpc_common.Connection):
 
 
 def _cast(addr, context, msg_id, topic, msg, timeout=None):
-    timeout_cast = timeout or FLAGS.rpc_cast_timeout
+    timeout_cast = timeout or CONF.rpc_cast_timeout
     payload = [RpcContext.marshal(context), msg]
 
     with Timeout(timeout_cast, exception=rpc_common.Timeout):
@@ -545,13 +543,13 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
 
 def _call(addr, context, msg_id, topic, msg, timeout=None):
     # timeout_response is how long we wait for a response
-    timeout = timeout or FLAGS.rpc_response_timeout
+    timeout = timeout or CONF.rpc_response_timeout
 
     # The msg_id is used to track replies.
     msg_id = str(uuid.uuid4().hex)
 
     # Replies always come into the reply service.
-    reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
+    reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
 
     LOG.debug(_("Creating payload"))
     # Curry the original request into a reply method.
@@ -573,7 +571,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
     with Timeout(timeout, exception=rpc_common.Timeout):
         try:
             msg_waiter = ZmqSocket(
-                "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
+                "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
                 zmq.SUB, subscribe=msg_id, bind=False
             )
 
@@ -599,7 +597,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
     # responses for Exceptions.
     for resp in responses:
         if isinstance(resp, types.DictType) and 'exc' in resp:
-            raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
+            raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
 
     return responses[-1]
 
@@ -610,7 +608,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
     dispatches to the matchmaker and sends
     message to all relevant hosts.
     """
-    conf = FLAGS
+    conf = CONF
     LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
 
     queues = matchmaker.queues(topic)
@@ -641,26 +639,22 @@ def create_connection(conf, new=True):
 
 def multicall(conf, *args, **kwargs):
     """Multiple calls."""
-    register_opts(conf)
     return _multi_send(_call, *args, **kwargs)
 
 
 def call(conf, *args, **kwargs):
     """Send a message, expect a response."""
-    register_opts(conf)
     data = _multi_send(_call, *args, **kwargs)
     return data[-1]
 
 
 def cast(conf, *args, **kwargs):
     """Send a message expecting no reply."""
-    register_opts(conf)
     _multi_send(_cast, *args, **kwargs)
 
 
 def fanout_cast(conf, context, topic, msg, **kwargs):
     """Send a message to all listening and expect no reply."""
-    register_opts(conf)
     # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
     # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
     _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
@@ -672,7 +666,6 @@ def notify(conf, context, topic, msg, **kwargs):
     Notifications are sent to topic-priority.
     This differs from the AMQP drivers which send to topic.priority.
     """
-    register_opts(conf)
     # NOTE(ewindisch): dot-priority in rpc notifier does not
     # work with our assumptions.
     topic.replace('.', '-')
@@ -684,7 +677,7 @@ def cleanup():
     global ZMQ_CTX
     global matchmaker
     matchmaker = None
-    ZMQ_CTX.destroy()
+    ZMQ_CTX.term()
     ZMQ_CTX = None
 
 
@@ -697,11 +690,11 @@ def register_opts(conf):
     # We memoize through these globals
     global ZMQ_CTX
     global matchmaker
-    global FLAGS
+    global CONF
 
-    if not FLAGS:
+    if not CONF:
         conf.register_opts(zmq_opts)
-        FLAGS = conf
+        CONF = conf
     # Don't re-set, if this method is called twice.
     if not ZMQ_CTX:
         ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
index db96a27486aef839d56e00fdf6c84e7eb21e23d6..99e98b43b1e9f48c8d19588970b88ce490138e8d 100644 (file)
@@ -112,11 +112,12 @@ class RpcProxy(object):
         self._set_version(msg, version)
         rpc.cast(context, self._get_topic(topic), msg)
 
-    def fanout_cast(self, context, msg, version=None):
+    def fanout_cast(self, context, msg, topic=None, version=None):
         """rpc.fanout_cast() a remote method.
 
         :param context: The request context
         :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
         :param version: (Optional) Override the requested API version in this
                message.
 
@@ -124,7 +125,7 @@ class RpcProxy(object):
                   from the remote method.
         """
         self._set_version(msg, version)
-        rpc.fanout_cast(context, self.topic, msg)
+        rpc.fanout_cast(context, self._get_topic(topic), msg)
 
     def cast_to_server(self, context, server_params, msg, topic=None,
                        version=None):
@@ -144,13 +145,15 @@ class RpcProxy(object):
         self._set_version(msg, version)
         rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
 
-    def fanout_cast_to_server(self, context, server_params, msg, version=None):
+    def fanout_cast_to_server(self, context, server_params, msg, topic=None,
+                              version=None):
         """rpc.fanout_cast_to_server() a remote method.
 
         :param context: The request context
         :param server_params: Server parameters.  See rpc.cast_to_server() for
                details.
         :param msg: The message to send, including the method and args.
+        :param topic: Override the topic for this message.
         :param version: (Optional) Override the requested API version in this
                message.
 
@@ -158,4 +161,5 @@ class RpcProxy(object):
                   return values.
         """
         self._set_version(msg, version)
-        rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
+        rpc.fanout_cast_to_server(context, server_params,
+                                  self._get_topic(topic), msg)
index 59f255d0c0cd8ca5605444d31c6ea622b85d3dd1..628f5e3c9b2f68e2b930982b470786dbdb4c6340 100644 (file)
@@ -52,7 +52,6 @@ def canonicalize_emails(changelog, mapping):
 
 # Get requirements from the first file that exists
 def get_reqs_from_files(requirements_files):
-    reqs_in = []
     for requirements_file in requirements_files:
         if os.path.exists(requirements_file):
             return open(requirements_file, 'r').read().split('\n')
@@ -144,8 +143,8 @@ def _get_git_next_version_suffix(branch_name):
     # where the bit after the last . is the short sha, and the bit between
     # the last and second to last is the revno count
     (revno, sha) = post_version.split(".")[-2:]
-    first_half = "%(milestonever)s~%(datestamp)s" % locals()
-    second_half = "%(revno_prefix)s%(revno)s.%(sha)s" % locals()
+    first_half = "%s~%s" % (milestonever, datestamp)
+    second_half = "%s%s.%s" % (revno_prefix, revno, sha)
     return ".".join((first_half, second_half))
 
 
index 5eeaf70aa49fc7b3d35ad7fa5e4d4d7be0f57106..c4f6cf0497a5573270f104e98061ff69587213e6 100644 (file)
@@ -21,7 +21,6 @@ Time related utilities and helper functions.
 
 import calendar
 import datetime
-import time
 
 import iso8601
 
@@ -94,16 +93,34 @@ def set_time_override(override_time=datetime.datetime.utcnow()):
 
 
 def advance_time_delta(timedelta):
-    """Advance overriden time using a datetime.timedelta."""
+    """Advance overridden time using a datetime.timedelta."""
     assert(not utcnow.override_time is None)
     utcnow.override_time += timedelta
 
 
 def advance_time_seconds(seconds):
-    """Advance overriden time by seconds."""
+    """Advance overridden time by seconds."""
     advance_time_delta(datetime.timedelta(0, seconds))
 
 
 def clear_time_override():
     """Remove the overridden time."""
     utcnow.override_time = None
+
+
+def marshall_now(now=None):
+    """Make an rpc-safe datetime with microseconds.
+
+    Note: tzinfo is stripped, but not required for relative times."""
+    if not now:
+        now = utcnow()
+    return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
+                minute=now.minute, second=now.second,
+                microsecond=now.microsecond)
+
+
+def unmarshall_time(tyme):
+    """Unmarshall a datetime dict."""
+    return datetime.datetime(day=tyme['day'], month=tyme['month'],
+                 year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
+                 second=tyme['second'], microsecond=tyme['microsecond'])
index 5d0099e92549af07255797182c3e160e1b2f8f16..e1d847ccb7af7f2122ae61050b01554e8a60d312 100644 (file)
@@ -20,7 +20,6 @@ System-level utilities and helper functions.
 """
 
 import logging
-import os
 import random
 import shlex
 
@@ -65,6 +64,50 @@ def bool_from_string(subject):
     return False
 
 
+def parse_host_port(address, default_port=None):
+    """
+    Interpret a string as a host:port pair.
+    An IPv6 address MUST be escaped if accompanied by a port,
+    because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
+    means both [2001:db8:85a3::8a2e:370:7334] and
+    [2001:db8:85a3::8a2e:370]:7334.
+
+    >>> parse_host_port('server01:80')
+    ('server01', 80)
+    >>> parse_host_port('server01')
+    ('server01', None)
+    >>> parse_host_port('server01', default_port=1234)
+    ('server01', 1234)
+    >>> parse_host_port('[::1]:80')
+    ('::1', 80)
+    >>> parse_host_port('[::1]')
+    ('::1', None)
+    >>> parse_host_port('[::1]', default_port=1234)
+    ('::1', 1234)
+    >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
+    ('2001:db8:85a3::8a2e:370:7334', 1234)
+
+    """
+    if address[0] == '[':
+        # Escaped ipv6
+        _host, _port = address[1:].split(']')
+        host = _host
+        if ':' in _port:
+            port = _port.split(':')[1]
+        else:
+            port = default_port
+    else:
+        if address.count(':') == 1:
+            host, port = address.split(':')
+        else:
+            # 0 means ipv4, >1 means ipv6.
+            # We prohibit unescaped ipv6 addresses with port.
+            host = address
+            port = default_port
+
+    return (host, None if port is None else int(port))
+
+
 def execute(*cmd, **kwargs):
     """
     Helper method to execute command with optional retry.