# License for the specific language governing permissions and limitations
# under the License.
-"""Root wrapper for Neutron
+from neutron.openstack.common.rootwrap import cmd
- Filters which commands neutron is allowed to run as another user.
-
- To use this, you should set the following in neutron.conf and the
- various .ini files for the agent plugins:
- root_helper=sudo neutron-rootwrap /etc/neutron/rootwrap.conf
-
- You also need to let the neutron user run neutron-rootwrap as root in
- /etc/sudoers:
- neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
- /etc/neutron/rootwrap.conf *
-
- Filter specs live in /etc/neutron/rootwrap.d/*.filters, or
- other locations pointed to by /etc/neutron/rootwrap.conf.
- To make allowed commands node-specific, your packaging should only
- install apropriate .filters for commands which are needed on each
- node.
-"""
-
-from __future__ import print_function
-
-import ConfigParser
-import logging
-import os
-import pwd
-import signal
-import subprocess
-import sys
-
-
-RC_UNAUTHORIZED = 99
-RC_NOCOMMAND = 98
-RC_BADCONFIG = 97
-RC_NOEXECFOUND = 96
-
-
-def _subprocess_setup():
- # Python installs a SIGPIPE handler by default. This is usually not what
- # non-Python subprocesses expect.
- signal.signal(signal.SIGPIPE, signal.SIG_DFL)
-
-
-def _exit_error(execname, message, errorcode, log=True):
- print("%s: %s" % (execname, message))
- if log:
- logging.error(message)
- sys.exit(errorcode)
-
-
-if __name__ == '__main__':
- # Split arguments, require at least a command
- execname = sys.argv.pop(0)
- if len(sys.argv) < 2:
- _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
-
- configfile = sys.argv.pop(0)
- userargs = sys.argv[:]
-
- # Add ../ to sys.path to allow running from branch
- possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
- os.pardir, os.pardir))
- if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
- sys.path.insert(0, possible_topdir)
-
- from neutron.rootwrap import wrapper
-
- # Load configuration
- try:
- rawconfig = ConfigParser.RawConfigParser()
- rawconfig.read(configfile)
- config = wrapper.RootwrapConfig(rawconfig)
- except ValueError as exc:
- msg = "Incorrect value in %s: %s" % (configfile, exc.message)
- _exit_error(execname, msg, RC_BADCONFIG, log=False)
- except ConfigParser.Error:
- _exit_error(execname, "Incorrect configuration file: %s" % configfile,
- RC_BADCONFIG, log=False)
-
- if config.use_syslog:
- wrapper.setup_syslog(execname,
- config.syslog_log_facility,
- config.syslog_log_level)
-
- # Execute command if it matches any of the loaded filters
- filters = wrapper.load_filters(config.filters_path)
- try:
- filtermatch = wrapper.match_filter(filters, userargs,
- exec_dirs=config.exec_dirs)
- if filtermatch:
- command = filtermatch.get_command(userargs,
- exec_dirs=config.exec_dirs)
- if config.use_syslog:
- logging.info("(%s > %s) Executing %s (filter match = %s)" % (
- os.getlogin(), pwd.getpwuid(os.getuid())[0],
- command, filtermatch.name))
-
- obj = subprocess.Popen(command,
- stdin=sys.stdin,
- stdout=sys.stdout,
- stderr=sys.stderr,
- preexec_fn=_subprocess_setup,
- env=filtermatch.get_environment(userargs))
- obj.wait()
- sys.exit(obj.returncode)
-
- except wrapper.FilterMatchNotExecutable as exc:
- msg = ("Executable not found: %s (filter match = %s)"
- % (exc.match.exec_path, exc.match.name))
- _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
-
- except wrapper.NoFilterMatched:
- msg = ("Unauthorized command: %s (no filter matched)"
- % ' '.join(userargs))
- _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
+cmd.main()
# License for the specific language governing permissions and limitations
# under the License.
-"""Root wrapper for Neutron
+from neutron.openstack.common.rootwrap import cmd
- Filters which commands neutron is allowed to run as another user.
-
- To use this, you should set the following in neutron.conf and the
- various .ini files for the agent plugins:
- root_helper=sudo neutron-rootwrap /etc/neutron/rootwrap.conf
-
- You also need to let the neutron user run neutron-rootwrap as root in
- /etc/sudoers:
- neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
- /etc/neutron/rootwrap.conf *
-
- Filter specs live in /etc/neutron/rootwrap.d/*.filters, or
- other locations pointed to by /etc/neutron/rootwrap.conf.
- To make allowed commands node-specific, your packaging should only
- install apropriate .filters for commands which are needed on each
- node.
-"""
-
-from __future__ import print_function
-
-import ConfigParser
-import logging
-import os
-import pwd
-import signal
-import subprocess
-import sys
-
-
-RC_UNAUTHORIZED = 99
-RC_NOCOMMAND = 98
-RC_BADCONFIG = 97
-RC_NOEXECFOUND = 96
-
-
-def _subprocess_setup():
- # Python installs a SIGPIPE handler by default. This is usually not what
- # non-Python subprocesses expect.
- signal.signal(signal.SIGPIPE, signal.SIG_DFL)
-
-
-def _exit_error(execname, message, errorcode, log=True):
- print("%s: %s" % (execname, message))
- if log:
- logging.error(message)
- sys.exit(errorcode)
-
-
-if __name__ == '__main__':
- # Split arguments, require at least a command
- execname = sys.argv.pop(0)
- if len(sys.argv) < 2:
- _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
-
- configfile = sys.argv.pop(0)
- userargs = sys.argv[:]
-
- # Add ../ to sys.path to allow running from branch
- possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
- os.pardir, os.pardir))
- if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
- sys.path.insert(0, possible_topdir)
-
- from neutron.rootwrap import wrapper
-
- # Load configuration
- try:
- rawconfig = ConfigParser.RawConfigParser()
- rawconfig.read(configfile)
- config = wrapper.RootwrapConfig(rawconfig)
- except ValueError as exc:
- msg = "Incorrect value in %s: %s" % (configfile, exc.message)
- _exit_error(execname, msg, RC_BADCONFIG, log=False)
- except ConfigParser.Error:
- _exit_error(execname, "Incorrect configuration file: %s" % configfile,
- RC_BADCONFIG, log=False)
-
- if config.use_syslog:
- wrapper.setup_syslog(execname,
- config.syslog_log_facility,
- config.syslog_log_level)
-
- # Execute command if it matches any of the loaded filters
- filters = wrapper.load_filters(config.filters_path)
- try:
- filtermatch = wrapper.match_filter(filters, userargs,
- exec_dirs=config.exec_dirs)
- if filtermatch:
- command = filtermatch.get_command(userargs,
- exec_dirs=config.exec_dirs)
- if config.use_syslog:
- logging.info("(%s > %s) Executing %s (filter match = %s)" % (
- os.getlogin(), pwd.getpwuid(os.getuid())[0],
- command, filtermatch.name))
-
- obj = subprocess.Popen(command,
- stdin=sys.stdin,
- stdout=sys.stdout,
- stderr=sys.stderr,
- preexec_fn=_subprocess_setup,
- env=filtermatch.get_environment(userargs))
- obj.wait()
- sys.exit(obj.returncode)
-
- except wrapper.FilterMatchNotExecutable as exc:
- msg = ("Executable not found: %s (filter match = %s)"
- % (exc.match.exec_path, exc.match.name))
- _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
-
- except wrapper.NoFilterMatched:
- msg = ("Unauthorized command: %s (no filter matched)"
- % ' '.join(userargs))
- _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
+cmd.main()
if os.path.exists(os.path.join(possible_topdir, "quantum", "__init__.py")):
sys.path.insert(0, possible_topdir)
- from quantum.rootwrap import wrapper
+ from neutron.openstack.common.rootwrap import wrapper
# Execute command if it matches any of the loaded filters
filters = wrapper.load_filters(filters_path)
[Filters]
# dhcp-agent
-ip_exec_dnsmasq: DnsmasqNetnsFilter, ip, root
-dnsmasq: DnsmasqFilter, /sbin/dnsmasq, root
-dnsmasq_usr: DnsmasqFilter, /usr/sbin/dnsmasq, root
+dnsmasq: EnvFilter, dnsmasq, root, NEUTRON_RELAY_SOCKET_PATH=, NEUTRON_NETWORK_ID=
# dhcp-agent uses kill as well, that's handled by the generic KillFilter
# it looks like these are the only signals needed, per
# neutron/agent/linux/dhcp.py
+# Configuration for neutron-rootwrap
+# This file should be owned by (and only-writeable by) the root user
+
[DEFAULT]
# List of directories to load filter definitions from (separated by ',').
# These directories MUST all be only writeable by root !
# These directories MUST all be only writeable by root !
exec_dirs=/sbin,/usr/sbin,/bin,/usr/bin
+# Enable logging to syslog
+# Default value is False
+use_syslog=False
+
+# Which syslog facility to use.
+# Valid values include auth, authpriv, syslog, user0, user1...
+# Default value is 'syslog'
+syslog_log_facility=syslog
+
+# Which messages to log.
+# INFO means log all usage
+# ERROR means only log unsuccessful attempts
+syslog_log_level=ERROR
+
[xenapi]
# XenAPI configuration is only required by the L2 agent if it is to
# target a XenServer/XCP compute host's dom0.
elif not self._parent.namespace:
raise Exception(_('No namespace defined for parent'))
else:
+ env_params = []
+ if addl_env:
+ env_params = (['env'] +
+ ['%s=%s' % pair for pair in addl_env.items()])
return utils.execute(
- ['%s=%s' % pair for pair in addl_env.items()] +
- ['ip', 'netns', 'exec', self._parent.namespace] + list(cmds),
+ ['ip', 'netns', 'exec', self._parent.namespace] +
+ env_params + list(cmds),
root_helper=self._parent.root_helper,
check_exit_code=check_exit_code)
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright (c) 2012 OpenStack Foundation.
+# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
--- /dev/null
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Root wrapper for OpenStack services
+
+ Filters which commands a service is allowed to run as another user.
+
+ To use this with neutron, you should set the following in
+ neutron.conf:
+ rootwrap_config=/etc/neutron/rootwrap.conf
+
+ You also need to let the neutron user run neutron-rootwrap
+ as root in sudoers:
+ neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
+ /etc/neutron/rootwrap.conf *
+
+ Service packaging should deploy .filters files only on nodes where
+ they are needed, to avoid allowing more than is necessary.
+"""
+
+from __future__ import print_function
+
+import ConfigParser
+import logging
+import os
+import pwd
+import signal
+import subprocess
+import sys
+
+
+RC_UNAUTHORIZED = 99
+RC_NOCOMMAND = 98
+RC_BADCONFIG = 97
+RC_NOEXECFOUND = 96
+
+
+def _subprocess_setup():
+ # Python installs a SIGPIPE handler by default. This is usually not what
+ # non-Python subprocesses expect.
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+
+def _exit_error(execname, message, errorcode, log=True):
+ print("%s: %s" % (execname, message))
+ if log:
+ logging.error(message)
+ sys.exit(errorcode)
+
+
+def main():
+ # Split arguments, require at least a command
+ execname = sys.argv.pop(0)
+ if len(sys.argv) < 2:
+ _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
+
+ configfile = sys.argv.pop(0)
+ userargs = sys.argv[:]
+
+ # Add ../ to sys.path to allow running from branch
+ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
+ os.pardir, os.pardir))
+ if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
+ sys.path.insert(0, possible_topdir)
+
+ from neutron.openstack.common.rootwrap import wrapper
+
+ # Load configuration
+ try:
+ rawconfig = ConfigParser.RawConfigParser()
+ rawconfig.read(configfile)
+ config = wrapper.RootwrapConfig(rawconfig)
+ except ValueError as exc:
+ msg = "Incorrect value in %s: %s" % (configfile, exc.message)
+ _exit_error(execname, msg, RC_BADCONFIG, log=False)
+ except ConfigParser.Error:
+ _exit_error(execname, "Incorrect configuration file: %s" % configfile,
+ RC_BADCONFIG, log=False)
+
+ if config.use_syslog:
+ wrapper.setup_syslog(execname,
+ config.syslog_log_facility,
+ config.syslog_log_level)
+
+ # Execute command if it matches any of the loaded filters
+ filters = wrapper.load_filters(config.filters_path)
+ try:
+ filtermatch = wrapper.match_filter(filters, userargs,
+ exec_dirs=config.exec_dirs)
+ if filtermatch:
+ command = filtermatch.get_command(userargs,
+ exec_dirs=config.exec_dirs)
+ if config.use_syslog:
+ logging.info("(%s > %s) Executing %s (filter match = %s)" % (
+ os.getlogin(), pwd.getpwuid(os.getuid())[0],
+ command, filtermatch.name))
+
+ obj = subprocess.Popen(command,
+ stdin=sys.stdin,
+ stdout=sys.stdout,
+ stderr=sys.stderr,
+ preexec_fn=_subprocess_setup,
+ env=filtermatch.get_environment(userargs))
+ obj.wait()
+ sys.exit(obj.returncode)
+
+ except wrapper.FilterMatchNotExecutable as exc:
+ msg = ("Executable not found: %s (filter match = %s)"
+ % (exc.match.exec_path, exc.match.name))
+ _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
+
+ except wrapper.NoFilterMatched:
+ msg = ("Unauthorized command: %s (no filter matched)"
+ % ' '.join(userargs))
+ _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright (c) 2012 OpenStack Foundation.
+# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
if self.real_exec is not None:
return self.real_exec
self.real_exec = ""
- if self.exec_path.startswith('/'):
+ if os.path.isabs(self.exec_path):
if os.access(self.exec_path, os.X_OK):
self.real_exec = self.exec_path
else:
return None
-class ExecCommandFilter(CommandFilter):
- def exec_args(self, userargs):
- return []
-
-
class RegExpFilter(CommandFilter):
"""Command filter doing regexp matching for every argument."""
class DnsmasqFilter(CommandFilter):
"""Specific filter for the dnsmasq call (which includes env)."""
- def is_dnsmasq_cmd(self, argv):
- if (argv[0] == "dnsmasq"):
- return True
- return False
-
- def is_dnsmasq_env_vars(self, argv):
- if (argv[0].startswith("NEUTRON_RELAY_SOCKET_PATH=") and
- argv[1].startswith("NEUTRON_NETWORK_ID=")):
- return True
- return False
+ CONFIG_FILE_ARG = 'CONFIG_FILE'
def match(self, userargs):
- """This matches the combination of the leading env
- vars plus "dnsmasq"
- """
- if (self.is_dnsmasq_env_vars(userargs) and
- self.is_dnsmasq_cmd(userargs[2:])):
+ if (userargs[0] == 'env' and
+ userargs[1].startswith(self.CONFIG_FILE_ARG) and
+ userargs[2].startswith('NETWORK_ID=') and
+ userargs[3] == 'dnsmasq'):
return True
return False
def get_command(self, userargs, exec_dirs=[]):
to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
- return [to_exec] + userargs[3:]
+ dnsmasq_pos = userargs.index('dnsmasq')
+ return [to_exec] + userargs[dnsmasq_pos + 1:]
def get_environment(self, userargs):
env = os.environ.copy()
- env['NEUTRON_RELAY_SOCKET_PATH'] = userargs[0].split('=')[-1]
- env['NEUTRON_NETWORK_ID'] = userargs[1].split('=')[-1]
+ env[self.CONFIG_FILE_ARG] = userargs[1].split('=')[-1]
+ env['NETWORK_ID'] = userargs[2].split('=')[-1]
return env
-class DnsmasqNetnsFilter(DnsmasqFilter):
- """Specific filter for the dnsmasq call (which includes env)."""
-
- def is_ip_netns_cmd(self, argv):
- if ((argv[0] == "ip") and
- (argv[1] == "netns") and
- (argv[2] == "exec")):
- return True
- return False
-
- def match(self, userargs):
- """This matches the combination of the leading env
- vars plus "ip" "netns" "exec" <foo> "dnsmasq"
- """
- if (self.is_dnsmasq_env_vars(userargs) and
- self.is_ip_netns_cmd(userargs[2:]) and
- self.is_dnsmasq_cmd(userargs[6:])):
- return True
- return False
+class DeprecatedDnsmasqFilter(DnsmasqFilter):
+ """Variant of dnsmasq filter to support old-style FLAGFILE."""
+ CONFIG_FILE_ARG = 'FLAGFILE'
class KillFilter(CommandFilter):
"""Specific filter for the kill calls.
+
1st argument is the user to run /bin/kill under
2nd argument is the location of the affected executable
+ if the argument is not absolute, it is checked against $PATH
Subsequent arguments list the accepted signals (if any)
This filter relies on /proc to accurately determine affected
return False
try:
command = os.readlink("/proc/%d/exe" % int(args[1]))
- # NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
- # the end if an executable is updated or deleted
- if command.endswith(" (deleted)"):
- command = command[:command.rindex(" ")]
- if command != self.args[0]:
- # Affected executable does not match
- return False
except (ValueError, OSError):
# Incorrect PID
return False
- return True
+
+ # NOTE(yufang521247): /proc/PID/exe may have '\0' on the
+ # end, because python doen't stop at '\0' when read the
+ # target path.
+ command = command.partition('\0')[0]
+
+ # NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
+ # the end if an executable is updated or deleted
+ if command.endswith(" (deleted)"):
+ command = command[:-len(" (deleted)")]
+
+ kill_command = self.args[0]
+
+ if os.path.isabs(kill_command):
+ return kill_command == command
+
+ return (os.path.isabs(command) and
+ kill_command == os.path.basename(command) and
+ os.path.dirname(command) in os.environ['PATH'].split(':'))
class ReadFileFilter(CommandFilter):
def match(self, userargs):
if userargs[0] == 'ip':
if userargs[1] == 'netns':
- if userargs[2] in ('list', 'add', 'delete'):
- return True
- else:
- return False
+ return (userargs[2] in ('list', 'add', 'delete'))
else:
return True
-class IpNetnsExecFilter(ExecCommandFilter):
+class EnvFilter(CommandFilter):
+ """Specific filter for the env utility.
+
+ Behaves like CommandFilter, except that it handles
+ leading env A=B.. strings appropriately.
+ """
+
+ def _extract_env(self, arglist):
+ """Extract all leading NAME=VALUE arguments from arglist."""
+
+ envs = set()
+ for arg in arglist:
+ if '=' not in arg:
+ break
+ envs.add(arg.partition('=')[0])
+ return envs
+
+ def __init__(self, exec_path, run_as, *args):
+ super(EnvFilter, self).__init__(exec_path, run_as, *args)
+
+ env_list = self._extract_env(self.args)
+ # Set exec_path to X when args are in the form of
+ # env A=a B=b C=c X Y Z
+ if "env" in exec_path and len(env_list) < len(self.args):
+ self.exec_path = self.args[len(env_list)]
+
+ def match(self, userargs):
+ # ignore leading 'env'
+ if userargs[0] == 'env':
+ userargs.pop(0)
+
+ # require one additional argument after configured ones
+ if len(userargs) < len(self.args):
+ return False
+
+ # extract all env args
+ user_envs = self._extract_env(userargs)
+ filter_envs = self._extract_env(self.args)
+ user_command = userargs[len(user_envs):len(user_envs) + 1]
+
+ # match first non-env argument with CommandFilter
+ return (super(EnvFilter, self).match(user_command)
+ and len(filter_envs) and user_envs == filter_envs)
+
+ def exec_args(self, userargs):
+ args = userargs[:]
+
+ # ignore leading 'env'
+ if args[0] == 'env':
+ args.pop(0)
+
+ # Throw away leading NAME=VALUE arguments
+ while args and '=' in args[0]:
+ args.pop(0)
+
+ return args
+
+ def get_command(self, userargs, exec_dirs=[]):
+ to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
+ return [to_exec] + self.exec_args(userargs)[1:]
+
+ def get_environment(self, userargs):
+ env = os.environ.copy()
+
+ # ignore leading 'env'
+ if userargs[0] == 'env':
+ userargs.pop(0)
+
+ # Handle leading NAME=VALUE pairs
+ for a in userargs:
+ env_name, equals, env_value = a.partition('=')
+ if not equals:
+ break
+ if env_name and env_value:
+ env[env_name] = env_value
+
+ return env
+
+
+class ChainingFilter(CommandFilter):
+ def exec_args(self, userargs):
+ return []
+
+
+class IpNetnsExecFilter(ChainingFilter):
"""Specific filter for the ip utility to that does match exec."""
+
def match(self, userargs):
- if userargs[:3] == ['ip', 'netns', 'exec']:
- return True
- else:
+ # Network namespaces currently require root
+ # require <ns> argument
+ if self.run_as != "root" or len(userargs) < 4:
return False
+ return (userargs[:3] == ['ip', 'netns', 'exec'])
+
def exec_args(self, userargs):
args = userargs[4:]
if args:
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright (c) 2012 OpenStack Foundation.
+# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import os
import string
-from neutron.rootwrap import filters
+from neutron.openstack.common.rootwrap import filters
class NoFilterMatched(Exception):
def match_filter(filter_list, userargs, exec_dirs=[]):
- """Return first matched filter from command filters.
+ """Checks user command and arguments through command filters.
+
+ Returns the first matching filter.
- Checks user command and arguments through command filters and
- returns the first matching filter.
Raises NoFilterMatched if no filter matched.
Raises FilterMatchNotExecutable if no executable was found for the
best filter match.
for f in filter_list:
if f.match(userargs):
- if isinstance(f, filters.ExecCommandFilter):
+ if isinstance(f, filters.ChainingFilter):
# This command calls exec verify that remaining args
# matches another filter.
+ def non_chain_filter(fltr):
+ return (fltr.run_as == f.run_as
+ and not isinstance(fltr, filters.ChainingFilter))
+
leaf_filters = [fltr for fltr in filter_list
- if not isinstance(fltr,
- filters.ExecCommandFilter)]
+ if non_chain_filter(fltr)]
args = f.exec_args(userargs)
- if (not args or not
- match_filter(leaf_filters, args, exec_dirs=exec_dirs)):
+ if (not args or not match_filter(leaf_filters,
+ args, exec_dirs=exec_dirs)):
continue
# Try other filters if executable is absent
raise IndexError
expected = [
- 'NEUTRON_RELAY_SOCKET_PATH=/dhcp/lease_relay',
- 'NEUTRON_NETWORK_ID=cccccccc-cccc-cccc-cccc-cccccccccccc',
'ip',
'netns',
'exec',
'qdhcp-ns',
+ 'env',
+ 'NEUTRON_RELAY_SOCKET_PATH=/dhcp/lease_relay',
+ 'NEUTRON_NETWORK_ID=cccccccc-cccc-cccc-cccc-cccccccccccc',
'dnsmasq',
'--no-hosts',
'--no-resolv',
env = dict(FOO=1, BAR=2)
self.netns_cmd.execute(['ip', 'link', 'list'], env)
execute.assert_called_once_with(
- ['FOO=1', 'BAR=2', 'ip', 'netns', 'exec', 'ns', 'ip', 'link',
- 'list'],
+ ['ip', 'netns', 'exec', 'ns', 'env', 'FOO=1', 'BAR=2',
+ 'ip', 'link', 'list'],
root_helper='sudo', check_exit_code=True)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import ConfigParser
-import logging
-import logging.handlers
-import os
-import subprocess
-import uuid
-
-import fixtures
-
-from neutron.rootwrap import filters
-from neutron.rootwrap import wrapper
-from neutron.tests import base
-
-
-class RootwrapTestCase(base.BaseTestCase):
-
- def setUp(self):
- super(RootwrapTestCase, self).setUp()
- self.filters = [
- filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
- filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
- filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
- filters.CommandFilter("/nonexistent/cat", "root"),
- filters.CommandFilter("/bin/cat", "root") # Keep this one last
- ]
-
- def test_RegExpFilter_match(self):
- usercmd = ["ls", "/root"]
- filtermatch = wrapper.match_filter(self.filters, usercmd)
- self.assertFalse(filtermatch is None)
- self.assertEqual(filtermatch.get_command(usercmd),
- ["/bin/ls", "/root"])
-
- def test_RegExpFilter_reject(self):
- usercmd = ["ls", "root"]
- self.assertRaises(wrapper.NoFilterMatched,
- wrapper.match_filter, self.filters, usercmd)
-
- def test_missing_command(self):
- valid_but_missing = ["foo_bar_not_exist"]
- invalid = ["foo_bar_not_exist_and_not_matched"]
- self.assertRaises(wrapper.FilterMatchNotExecutable,
- wrapper.match_filter,
- self.filters, valid_but_missing)
- self.assertRaises(wrapper.NoFilterMatched,
- wrapper.match_filter, self.filters, invalid)
-
- def test_DnsmasqFilter(self):
- usercmd = ['NEUTRON_RELAY_SOCKET_PATH=A', 'NEUTRON_NETWORK_ID=foobar',
- 'dnsmasq', 'foo']
- f = filters.DnsmasqFilter("/usr/bin/dnsmasq", "root")
- self.assertTrue(f.match(usercmd))
- self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo'])
- env = f.get_environment(usercmd)
- self.assertEqual(env.get('NEUTRON_RELAY_SOCKET_PATH'), 'A')
- self.assertEqual(env.get('NEUTRON_NETWORK_ID'), 'foobar')
-
- def test_DnsmasqNetnsFilter(self):
- usercmd = ['NEUTRON_RELAY_SOCKET_PATH=A', 'NEUTRON_NETWORK_ID=foobar',
- 'ip', 'netns', 'exec', 'foo', 'dnsmasq', 'foo']
- f = filters.DnsmasqNetnsFilter("/sbin/ip", "root")
- self.assertTrue(f.match(usercmd))
- self.assertEqual(f.get_command(usercmd), ['/sbin/ip', 'netns', 'exec',
- 'foo', 'dnsmasq', 'foo'])
- env = f.get_environment(usercmd)
- self.assertEqual(env.get('NEUTRON_RELAY_SOCKET_PATH'), 'A')
- self.assertEqual(env.get('NEUTRON_NETWORK_ID'), 'foobar')
-
- def test_KillFilter(self):
- if not os.path.exists("/proc/%d" % os.getpid()):
- self.skipTest("Test requires /proc filesystem (procfs)")
- p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- try:
- f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
- f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
- usercmd = ['kill', '-ALRM', p.pid]
- # Incorrect signal should fail
- self.assertFalse(f.match(usercmd) or f2.match(usercmd))
- usercmd = ['kill', p.pid]
- # Providing no signal should fail
- self.assertFalse(f.match(usercmd) or f2.match(usercmd))
- # Providing matching signal should be allowed
- usercmd = ['kill', '-9', p.pid]
- self.assertTrue(f.match(usercmd) or f2.match(usercmd))
-
- f = filters.KillFilter("root", "/bin/cat")
- f2 = filters.KillFilter("root", "/usr/bin/cat")
- usercmd = ['kill', os.getpid()]
- # Our own PID does not match /bin/sleep, so it should fail
- self.assertFalse(f.match(usercmd) or f2.match(usercmd))
- usercmd = ['kill', 999999]
- # Nonexistent PID should fail
- self.assertFalse(f.match(usercmd) or f2.match(usercmd))
- usercmd = ['kill', p.pid]
- # Providing no signal should work
- self.assertTrue(f.match(usercmd) or f2.match(usercmd))
- finally:
- # Terminate the "cat" process and wait for it to finish
- p.terminate()
- p.wait()
-
- def test_KillFilter_no_raise(self):
- """Makes sure ValueError from bug 926412 is gone."""
- f = filters.KillFilter("root", "")
- # Providing anything other than kill should be False
- usercmd = ['notkill', 999999]
- self.assertFalse(f.match(usercmd))
- # Providing something that is not a pid should be False
- usercmd = ['kill', 'notapid']
- self.assertFalse(f.match(usercmd))
-
- def test_KillFilter_deleted_exe(self):
- """Makes sure deleted exe's are killed correctly."""
- # See bug #967931.
- def fake_readlink(blah):
- return '/bin/commandddddd (deleted)'
-
- f = filters.KillFilter("root", "/bin/commandddddd")
- usercmd = ['kill', 1234]
- # Providing no signal should work
- self.stubs.Set(os, 'readlink', fake_readlink)
- self.assertTrue(f.match(usercmd))
-
- def test_ReadFileFilter(self):
- goodfn = '/good/file.name'
- f = filters.ReadFileFilter(goodfn)
- usercmd = ['cat', '/bad/file']
- self.assertFalse(f.match(['cat', '/bad/file']))
- usercmd = ['cat', goodfn]
- self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn])
- self.assertTrue(f.match(usercmd))
-
- def test_IpFilter_non_netns(self):
- f = filters.IpFilter('/sbin/ip', 'root')
- self.assertTrue(f.match(['ip', 'link', 'list']))
-
- def _test_IpFilter_netns_helper(self, action):
- f = filters.IpFilter('/sbin/ip', 'root')
- self.assertTrue(f.match(['ip', 'link', action]))
-
- def test_IpFilter_netns_add(self):
- self._test_IpFilter_netns_helper('add')
-
- def test_IpFilter_netns_delete(self):
- self._test_IpFilter_netns_helper('delete')
-
- def test_IpFilter_netns_list(self):
- self._test_IpFilter_netns_helper('list')
-
- def test_IpNetnsExecFilter_match(self):
- f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
- self.assertTrue(
- f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
-
- def test_IpNetnsExecFilter_nomatch(self):
- f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
- self.assertFalse(f.match(['ip', 'link', 'list']))
-
- def test_match_filter_recurses_exec_command_filter_matches(self):
- filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
- filters.IpFilter('/sbin/ip', 'root')]
- args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
-
- self.assertIsNotNone(wrapper.match_filter(filter_list, args))
-
- def test_match_filter_recurses_exec_command_filter_does_not_match(self):
- filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
- filters.IpFilter('/sbin/ip', 'root')]
- args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
- 'ip', 'link', 'list']
-
- self.assertRaises(wrapper.NoFilterMatched,
- wrapper.match_filter, filter_list, args)
-
- def test_exec_dirs_search(self):
- # This test supposes you have /bin/cat or /usr/bin/cat locally
- f = filters.CommandFilter("cat", "root")
- usercmd = ['cat', '/f']
- self.assertTrue(f.match(usercmd))
- self.assertTrue(f.get_command(usercmd,
- exec_dirs=['/bin', '/usr/bin'])
- in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))
-
- def test_skips(self):
- # Check that all filters are skipped and that the last matches
- usercmd = ["cat", "/"]
- filtermatch = wrapper.match_filter(self.filters, usercmd)
- self.assertTrue(filtermatch is self.filters[-1])
-
- def test_RootwrapConfig(self):
- raw = ConfigParser.RawConfigParser()
-
- # Empty config should raise ConfigParser.Error
- self.assertRaises(ConfigParser.Error, wrapper.RootwrapConfig, raw)
-
- # Check default values
- raw.set('DEFAULT', 'filters_path', '/a,/b')
- config = wrapper.RootwrapConfig(raw)
- self.assertEqual(config.filters_path, ['/a', '/b'])
- self.assertEqual(config.exec_dirs, os.environ["PATH"].split(':'))
- self.assertFalse(config.use_syslog)
- self.assertEqual(config.syslog_log_facility,
- logging.handlers.SysLogHandler.LOG_SYSLOG)
- self.assertEqual(config.syslog_log_level, logging.ERROR)
-
- # Check general values
- raw.set('DEFAULT', 'exec_dirs', '/a,/x')
- config = wrapper.RootwrapConfig(raw)
- self.assertEqual(config.exec_dirs, ['/a', '/x'])
-
- raw.set('DEFAULT', 'use_syslog', 'oui')
- self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
- raw.set('DEFAULT', 'use_syslog', 'true')
- config = wrapper.RootwrapConfig(raw)
- self.assertTrue(config.use_syslog)
-
- raw.set('DEFAULT', 'syslog_log_facility', 'moo')
- self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
- raw.set('DEFAULT', 'syslog_log_facility', 'local0')
- config = wrapper.RootwrapConfig(raw)
- self.assertEqual(config.syslog_log_facility,
- logging.handlers.SysLogHandler.LOG_LOCAL0)
- raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
- config = wrapper.RootwrapConfig(raw)
- self.assertEqual(config.syslog_log_facility,
- logging.handlers.SysLogHandler.LOG_AUTH)
-
- raw.set('DEFAULT', 'syslog_log_level', 'bar')
- self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
- raw.set('DEFAULT', 'syslog_log_level', 'INFO')
- config = wrapper.RootwrapConfig(raw)
- self.assertEqual(config.syslog_log_level, logging.INFO)
-
-
-class PathFilterTestCase(base.BaseTestCase):
- def setUp(self):
- super(PathFilterTestCase, self).setUp()
-
- tmpdir = fixtures.TempDir('/tmp')
- self.useFixture(tmpdir)
-
- self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)
-
- gen_name = lambda: str(uuid.uuid4())
-
- self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
- self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join('/tmp', 'some')
- self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
- 'some')
- self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')
-
- self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
- gen_name())
- os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
- self.TRAVERSAL_SYMLINK_WITHIN_DIR)
-
- self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
- gen_name())
- os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
- self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)
-
- self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
- os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)
-
- self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
- os.symlink(os.path.join('/tmp', 'some_file'), self.SYMLINK_OUTSIDE_DIR)
-
- def test_argument_pass_constraint(self):
- f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')
-
- args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
- self.assertTrue(f.match(args))
-
- def test_argument_equality_constraint(self):
- f = filters.PathFilter('/bin/chown', 'root', 'nova', '/tmp/spam/eggs')
-
- args = ['chown', 'nova', '/tmp/spam/eggs']
- self.assertTrue(f.match(args))
-
- args = ['chown', 'neutron', '/tmp/spam/eggs']
- self.assertFalse(f.match(args))
-
- def test_wrong_arguments_number(self):
- args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
- self.assertFalse(self.f.match(args))
-
- def test_wrong_exec_command(self):
- args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
- self.assertFalse(self.f.match(args))
-
- def test_match(self):
- args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
- self.assertTrue(self.f.match(args))
-
- def test_match_traversal(self):
- args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
- self.assertTrue(self.f.match(args))
-
- def test_match_symlink(self):
- args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
- self.assertTrue(self.f.match(args))
-
- def test_match_traversal_symlink(self):
- args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
- self.assertTrue(self.f.match(args))
-
- def test_reject(self):
- args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
- self.assertFalse(self.f.match(args))
-
- def test_reject_traversal(self):
- args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
- self.assertFalse(self.f.match(args))
-
- def test_reject_symlink(self):
- args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
- self.assertFalse(self.f.match(args))
-
- def test_reject_traversal_symlink(self):
- args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
- self.assertFalse(self.f.match(args))
-
- def test_get_command(self):
- args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
- expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
-
- self.assertEqual(expected, self.f.get_command(args))
-
- def test_get_command_traversal(self):
- args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
- expected = ['/bin/chown', 'nova',
- os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]
-
- self.assertEqual(expected, self.f.get_command(args))
-
- def test_get_command_symlink(self):
- args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
- expected = ['/bin/chown', 'nova',
- os.path.realpath(self.SYMLINK_WITHIN_DIR)]
-
- self.assertEqual(expected, self.f.get_command(args))
-
- def test_get_command_traversal_symlink(self):
- args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
- expected = ['/bin/chown', 'nova',
- os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]
-
- self.assertEqual(expected, self.f.get_command(args))