raise exception.InvalidInput(reason=msg)
-def fetchfile(url, target):
- LOG.debug(_('Fetching %s') % url)
- execute('curl', '--fail', url, '-o', target)
-
-
def execute(*cmd, **kwargs):
"""Convenience wrapper around oslo's execute() method."""
if 'run_as_root' in kwargs and not 'root_helper' in kwargs:
return (stdout, stderr)
-def trycmd(*args, **kwargs):
- """Convenience wrapper around oslo's trycmd() method."""
- if 'run_as_root' in kwargs and not 'root_helper' in kwargs:
- kwargs['root_helper'] = get_root_helper()
- try:
- (stdout, stderr) = processutils.trycmd(*args, **kwargs)
- except processutils.ProcessExecutionError as ex:
- raise exception.ProcessExecutionError(
- exit_code=ex.exit_code,
- stderr=ex.stderr,
- stdout=ex.stdout,
- cmd=ex.cmd,
- description=ex.description)
- except processutils.UnknownArgumentError as ex:
- raise exception.Error(ex.message)
- return (stdout, stderr)
-
-
def check_ssh_injection(cmd_list):
ssh_injection_pattern = ['`', '$', '|', '||', ';', '&', '&&', '>', '>>',
'<']
return arg
-def generate_uid(topic, size=8):
- characters = '01234567890abcdefghijklmnopqrstuvwxyz'
- choices = [random.choice(characters) for x in xrange(size)]
- return '%s-%s' % (topic, ''.join(choices))
-
-
# Default symbols to use for passwords. Avoids visually confusing characters.
# ~6 bits per symbol
DEFAULT_PASSWORD_SYMBOLS = ('23456789', # Removed: 0,1
return generate_password(length, symbolgroups)
-def last_octet(address):
- return int(address.split('.')[-1])
-
-
-def get_my_linklocal(interface):
- try:
- if_str = execute('ip', '-f', 'inet6', '-o', 'addr', 'show', interface)
- condition = '\s+inet6\s+([0-9a-f:]+)/\d+\s+scope\s+link'
- links = [re.search(condition, x) for x in if_str[0].split('\n')]
- address = [w.group(1) for w in links if w is not None]
- if address[0] is not None:
- return address[0]
- else:
- raise exception.Error(_('Link Local address is not found.:%s')
- % if_str)
- except Exception as ex:
- raise exception.Error(_("Couldn't get Link Local IP of %(interface)s"
- " :%(ex)s") %
- {'interface': interface, 'ex': ex, })
-
-
-def parse_mailmap(mailmap='.mailmap'):
- mapping = {}
- if os.path.exists(mailmap):
- fp = open(mailmap, 'r')
- for l in fp:
- l = l.strip()
- if not l.startswith('#') and ' ' in l:
- canonical_email, alias = l.split(' ')
- mapping[alias.lower()] = canonical_email.lower()
- return mapping
-
-
-def str_dict_replace(s, mapping):
- for s1, s2 in mapping.iteritems():
- s = s.replace(s1, s2)
- return s
-
-
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""
return get_from_path(results, remainder)
-def flatten_dict(dict_, flattened=None):
- """Recursively flatten a nested dictionary."""
- flattened = flattened or {}
- for key, value in dict_.iteritems():
- if hasattr(value, 'iteritems'):
- flatten_dict(value, flattened)
- else:
- flattened[key] = value
- return flattened
-
-
def map_dict_keys(dict_, key_map):
"""Return a dict in which the dictionaries keys are mapped to new keys."""
mapped = {}