class ConfigOpts(object):
- def __init__(self, ...):
+ def __call__(self, ...):
opts = [
MultiStrOpt('config-file',
...
]
+This module also contains a global instance of the CommonConfigOpts class
+in order to support a common usage pattern in OpenStack:
+
+ from openstack.common import cfg
+
+ opts = [
+ cfg.StrOpt('bind_host' default='0.0.0.0'),
+ cfg.IntOpt('bind_port', default=9292),
+ ]
+
+ CONF = cfg.CONF
+ CONF.register_opts(opts)
+
+ def start(server, app):
+ server.start(app, CONF.bind_port, CONF.bind_host)
+
"""
import collections
return True
+ def _unregister_opt(self, opt):
+ """Remove an opt from this group.
+
+ :param opt: an Opt object
+ """
+ if opt.dest in self._opts:
+ del self._opts[opt.dest]
+
def _get_optparse_group(self, parser):
"""Build an optparse.OptionGroup for this group."""
if self._optparse_group is None:
self.help)
return self._optparse_group
+ def _clear(self):
+ """Clear this group's option parsing state."""
+ self._optparse_group = None
+
class ParseError(iniparser.ParseError):
def __init__(self, msg, lineno, line, filename):
the values of options.
"""
- def __init__(self,
- project=None,
- prog=None,
- version=None,
- usage=None,
- default_config_files=None):
- """Construct a ConfigOpts object.
+ def __init__(self):
+ """Construct a ConfigOpts object."""
+ self._opts = {} # dict of dicts of (opt:, override:, default:)
+ self._groups = {}
- Automatically registers the --config-file option with either a supplied
- list of default config files, or a list from find_config_files().
+ self._args = None
+ self._oparser = None
+ self._cparser = None
+ self._cli_values = {}
+ self.__cache = {}
+ self._config_opts = []
+ self._disable_interspersed_args = False
- :param project: the toplevel project name, used to locate config files
- :param prog: the name of the program (defaults to sys.argv[0] basename)
- :param version: the program version (for --version)
- :param usage: a usage string (%prog will be expanded)
- :param default_config_files: config files to use by default
- """
+ def _setup(self, project, prog, version, usage, default_config_files):
+ """Initialize a ConfigOpts object for option parsing."""
if prog is None:
prog = os.path.basename(sys.argv[0])
if default_config_files is None:
default_config_files = find_config_files(project, prog)
- self.project = project
- self.prog = prog
- self.version = version
- self.usage = usage
- self.default_config_files = default_config_files
-
- self._opts = {} # dict of dicts of (opt:, override:, default:)
- self._groups = {}
-
- self._args = None
- self._cli_values = {}
-
- self._oparser = optparse.OptionParser(prog=self.prog,
- version=self.version,
- usage=self.usage)
- self._cparser = None
+ self._oparser = optparse.OptionParser(prog=prog,
+ version=version,
+ usage=usage)
+ if self._disable_interspersed_args:
+ self._oparser.disable_interspersed_args()
- self.__cache = {}
-
- opts = [
+ self._config_opts = [
MultiStrOpt('config-file',
- default=self.default_config_files,
+ default=default_config_files,
metavar='PATH',
help='Path to a config file to use. Multiple config '
'files can be specified, with values in later '
'files taking precedence. The default files '
- ' used are: %s' %
- (self.default_config_files, )),
+ ' used are: %s' % (default_config_files, )),
StrOpt('config-dir',
metavar='DIR',
help='Path to a config directory to pull *.conf '
'hence over-ridden options in the directory take '
'precedence.'),
]
- self.register_cli_opts(opts)
+ self.register_cli_opts(self._config_opts)
+
+ self.project = project
+ self.prog = prog
+ self.version = version
+ self.usage = usage
+ self.default_config_files = default_config_files
def __clear_cache(f):
@functools.wraps(f)
return __inner
- def __call__(self, args=None):
+ def __call__(self,
+ args=None,
+ project=None,
+ prog=None,
+ version=None,
+ usage=None,
+ default_config_files=None):
"""Parse command line arguments and config files.
Calling a ConfigOpts object causes the supplied command line arguments
The object may be called multiple times, each time causing the previous
set of values to be overwritten.
+ Automatically registers the --config-file option with either a supplied
+ list of default config files, or a list from find_config_files().
+
If the --config-dir option is set, any *.conf files from this
directory are pulled in, after all the file(s) specified by the
--config-file option.
- :params args: command line arguments (defaults to sys.argv[1:])
+ :param args: command line arguments (defaults to sys.argv[1:])
+ :param project: the toplevel project name, used to locate config files
+ :param prog: the name of the program (defaults to sys.argv[0] basename)
+ :param version: the program version (for --version)
+ :param usage: a usage string (%prog will be expanded)
+ :param default_config_files: config files to use by default
:returns: the list of arguments left over after parsing options
:raises: SystemExit, ConfigFilesNotFoundError, ConfigFileParseError,
- RequiredOptError
+ RequiredOptError, DuplicateOptError
"""
self.clear()
- self._args = args
+ self._setup(project, prog, version, usage, default_config_files)
- (values, args) = self._oparser.parse_args(self._args)
+ self._cli_values, leftovers = self._parse_cli_opts(args)
- self._cli_values = vars(values)
-
- def _list_config_dir():
- return sorted(glob.glob(os.path.join(self.config_dir, '*.conf')))
-
- from_file = list(self.config_file)
-
- from_dir = _list_config_dir() if self.config_dir else []
-
- self._parse_config_files(from_file + from_dir)
+ self._parse_config_files()
self._check_required_opts()
- return args
+ return leftovers
def __getattr__(self, name):
"""Look up an option value and perform string substitution.
def clear(self):
"""Clear the state of the object to before it was called."""
self._args = None
- self._cli_values = {}
+ self._cli_values.clear()
+ self._oparser = None
self._cparser = None
+ self.unregister_opts(self._config_opts)
+ for group in self._groups.values():
+ group._clear()
@__clear_cache
def register_opt(self, opt, group=None):
if self._args is not None:
raise ArgsAlreadyParsedError("cannot register CLI option")
- if not self.register_opt(opt, group, clear_cache=False):
- return False
-
- if group is not None:
- group = self._get_group(group, autocreate=True)
-
- opt._add_to_cli(self._oparser, group)
-
- return True
+ return self.register_opt(opt, group, clear_cache=False)
@__clear_cache
def register_cli_opts(self, opts, group=None):
self._groups[group.name] = copy.copy(group)
+ @__clear_cache
+ def unregister_opt(self, opt, group=None):
+ """Unregister an option.
+
+ :param opt: an Opt object
+ :param group: an optional OptGroup object or group name
+ :raises: ArgsAlreadyParsedError, NoSuchGroupError
+ """
+ if self._args is not None:
+ raise ArgsAlreadyParsedError("reset before unregistering options")
+
+ if group is not None:
+ self._get_group(group)._unregister_opt(opt)
+ elif opt.dest in self._opts:
+ del self._opts[opt.dest]
+
+ @__clear_cache
+ def unregister_opts(self, opts, group=None):
+ """Unregister multiple CLI option schemas at once."""
+ for opt in opts:
+ self.unregister_opt(opt, group, clear_cache=False)
+
@__clear_cache
def set_override(self, name, override, group=None):
"""Override an opt value.
opt_info = self._get_opt_info(name, group)
opt_info['default'] = default
+ def _all_opt_infos(self):
+ """A generator function for iteration opt infos."""
+ for info in self._opts.values():
+ yield info, None
+ for group in self._groups.values():
+ for info in group._opts.values():
+ yield info, group
+
+ def _all_opts(self):
+ """A generator function for iteration opts."""
+ for info, group in self._all_opt_infos():
+ yield info['opt'], group
+
def _unset_defaults_and_overrides(self):
"""Unset any default or override on all options."""
- def unset(opts):
- for info in opts.values():
- info['default'] = None
- info['override'] = None
-
- unset(self._opts)
- for group in self._groups.values():
- unset(group._opts)
+ for info, group in self._all_opt_infos():
+ info['default'] = None
+ info['override'] = None
def disable_interspersed_args(self):
"""Set parsing to stop on the first non-option.
i.e. argument parsing is stopped at the first non-option argument.
"""
- self._oparser.disable_interspersed_args()
+ self._disable_interspersed_args = True
def enable_interspersed_args(self):
"""Set parsing to not stop on the first non-option.
This it the default behaviour."""
- self._oparser.enable_interspersed_args()
+ self._disable_interspersed_args = False
def find_file(self, name):
"""Locate a file located alongside the config files.
return opts[opt_name]
- def _parse_config_files(self, config_files):
- """Parse the supplied configuration files.
+ def _parse_config_files(self):
+ """Parse the config files from --config-file and --config-dir.
:raises: ConfigFilesNotFoundError, ConfigFileParseError
"""
+ config_files = list(self.config_file)
+
+ if self.config_dir:
+ config_dir_glob = os.path.join(self.config_dir, '*.conf')
+ config_files += sorted(glob.glob(config_dir_glob))
+
self._cparser = MultiConfigParser()
try:
not_read_ok = filter(lambda f: f not in read_ok, config_files)
raise ConfigFilesNotFoundError(not_read_ok)
- def _do_check_required_opts(self, opts, group=None):
- for info in opts.values():
+ def _check_required_opts(self):
+ """Check that all opts marked as required have values specified.
+
+ :raises: RequiredOptError
+ """
+ for info, group in self._all_opt_infos():
default, opt, override = [info[k] for k in sorted(info.keys())]
if opt.required:
if self._get(opt.name, group) is None:
raise RequiredOptError(opt.name, group)
- def _check_required_opts(self):
- """Check that all opts marked as required have values specified.
+ def _parse_cli_opts(self, args):
+ """Parse command line options.
+
+ Initializes the command line option parser and parses the supplied
+ command line arguments.
+
+ :param args: the command line arguments
+ :returns: a dict of parsed option values
+ :raises: SystemExit, DuplicateOptError
- :raises: RequiredOptError
"""
- self._do_check_required_opts(self._opts)
+ self._args = args
- for group in self._groups.values():
- self._do_check_required_opts(group._opts, group)
+ for opt, group in self._all_opts():
+ opt._add_to_cli(self._oparser, group)
+
+ values, leftovers = self._oparser.parse_args(args)
+
+ return vars(values), leftovers
class GroupAttr(collections.Mapping):
help='syslog facility to receive log lines')
]
- def __init__(self, **kwargs):
- super(CommonConfigOpts, self).__init__(**kwargs)
+ def __init__(self):
+ super(CommonConfigOpts, self).__init__()
self.register_cli_opts(self.common_cli_opts)
self.register_cli_opts(self.logging_cli_opts)
+
+
+CONF = CommonConfigOpts()
def setUp(self):
super(FlagsTestCase, self).setUp()
- self.FLAGS = flags.CinderConfigOpts()
- self.global_FLAGS = flags.FLAGS
- self.flags(config_file=[])
def test_declare(self):
- self.assert_('answer' not in self.global_FLAGS)
+ self.assert_('answer' not in FLAGS)
flags.DECLARE('answer', 'cinder.tests.declare_flags')
- self.assert_('answer' in self.global_FLAGS)
- self.assertEqual(self.global_FLAGS.answer, 42)
+ self.assert_('answer' in FLAGS)
+ self.assertEqual(FLAGS.answer, 42)
# Make sure we don't overwrite anything
- self.global_FLAGS.set_override('answer', 256)
- self.assertEqual(self.global_FLAGS.answer, 256)
+ FLAGS.set_override('answer', 256)
+ self.assertEqual(FLAGS.answer, 256)
flags.DECLARE('answer', 'cinder.tests.declare_flags')
- self.assertEqual(self.global_FLAGS.answer, 256)
+ self.assertEqual(FLAGS.answer, 256)
def test_getopt_non_interspersed_args(self):
- self.assert_('runtime_answer' not in self.global_FLAGS)
+ self.assert_('runtime_answer' not in FLAGS)
argv = ['flags_test', 'extra_arg', '--runtime_answer=60']
- args = self.global_FLAGS(argv)
+ args = flags.parse_args(argv, default_config_files=[])
self.assertEqual(len(args), 3)
self.assertEqual(argv, args)
def test_runtime_and_unknown_flags(self):
- self.assert_('runtime_answer' not in self.global_FLAGS)
+ self.assert_('runtime_answer' not in FLAGS)
import cinder.tests.runtime_flags
- self.assert_('runtime_answer' in self.global_FLAGS)
- self.assertEqual(self.global_FLAGS.runtime_answer, 54)
+ self.assert_('runtime_answer' in FLAGS)
+ self.assertEqual(FLAGS.runtime_answer, 54)
def test_long_vs_short_flags(self):
- self.global_FLAGS.clear()
- self.global_FLAGS.register_cli_opt(cfg.StrOpt('duplicate_answer_long',
- default='val',
- help='desc'))
+ FLAGS.clear()
+ FLAGS.register_cli_opt(cfg.StrOpt('duplicate_answer_long',
+ default='val',
+ help='desc'))
argv = ['flags_test', '--duplicate_answer=60', 'extra_arg']
- args = self.global_FLAGS(argv)
+ args = flags.parse_args(argv, default_config_files=[])
- self.assert_('duplicate_answer' not in self.global_FLAGS)
- self.assert_(self.global_FLAGS.duplicate_answer_long, 60)
+ self.assert_('duplicate_answer' not in FLAGS)
+ self.assert_(FLAGS.duplicate_answer_long, 60)
- self.global_FLAGS.clear()
- self.global_FLAGS.register_cli_opt(cfg.IntOpt('duplicate_answer',
- default=60,
- help='desc'))
- args = self.global_FLAGS(argv)
- self.assertEqual(self.global_FLAGS.duplicate_answer, 60)
- self.assertEqual(self.global_FLAGS.duplicate_answer_long, 'val')
+ FLAGS.clear()
+ FLAGS.register_cli_opt(cfg.IntOpt('duplicate_answer',
+ default=60,
+ help='desc'))
+ args = flags.parse_args(argv, default_config_files=[])
+ self.assertEqual(FLAGS.duplicate_answer, 60)
+ self.assertEqual(FLAGS.duplicate_answer_long, 'val')
def test_flag_leak_left(self):
self.assertEqual(FLAGS.flags_unittest, 'foo')
self.assertEqual(FLAGS.flags_unittest, 'bar')
FLAGS.reset()
self.assertEqual(FLAGS.flags_unittest, 'foo')
-
- def test_defaults(self):
- self.FLAGS.register_opt(cfg.StrOpt('foo', default='bar', help='desc'))
- self.assertEqual(self.FLAGS.foo, 'bar')
-
- self.FLAGS.set_default('foo', 'blaa')
- self.assertEqual(self.FLAGS.foo, 'blaa')
-
- def test_templated_values(self):
- self.FLAGS.register_opt(cfg.StrOpt('foo', default='foo', help='desc'))
- self.FLAGS.register_opt(cfg.StrOpt('bar', default='bar', help='desc'))
- self.FLAGS.register_opt(cfg.StrOpt('blaa',
- default='$foo$bar', help='desc'))
- self.assertEqual(self.FLAGS.blaa, 'foobar')