node.
"""
+from __future__ import print_function
+
import ConfigParser
+import logging
import os
+import pwd
import signal
+import subprocess
import sys
-from quantum.common import utils
-
RC_UNAUTHORIZED = 99
RC_NOCOMMAND = 98
RC_BADCONFIG = 97
+RC_NOEXECFOUND = 96
+
+
+def _subprocess_setup():
+ # Python installs a SIGPIPE handler by default. This is usually not what
+ # non-Python subprocesses expect.
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+
+def _exit_error(execname, message, errorcode, log=True):
+ print("%s: %s" % (execname, message))
+ if log:
+ logging.error(message)
+ sys.exit(errorcode)
if __name__ == '__main__':
# Split arguments, require at least a command
execname = sys.argv.pop(0)
- # argv[0] required; path to conf file
if len(sys.argv) < 2:
- print "%s: %s" % (execname, "No command specified")
- sys.exit(RC_NOCOMMAND)
+ _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
configfile = sys.argv.pop(0)
userargs = sys.argv[:]
- # Load configuration
- config = ConfigParser.RawConfigParser()
- config.read(configfile)
- try:
- filters_path = config.get("DEFAULT", "filters_path").split(",")
- filters = None
- except ConfigParser.Error:
- print "%s: Incorrect configuration file: %s" % (execname, configfile)
- sys.exit(RC_BADCONFIG)
-
# Add ../ to sys.path to allow running from branch
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
os.pardir, os.pardir))
from quantum.rootwrap import wrapper
+ # Load configuration
+ try:
+ rawconfig = ConfigParser.RawConfigParser()
+ rawconfig.read(configfile)
+ config = wrapper.RootwrapConfig(rawconfig)
+ except ValueError as exc:
+ msg = "Incorrect value in %s: %s" % (configfile, exc.message)
+ _exit_error(execname, msg, RC_BADCONFIG, log=False)
+ except ConfigParser.Error:
+ _exit_error(execname, "Incorrect configuration file: %s" % configfile,
+ RC_BADCONFIG, log=False)
+
+ if config.use_syslog:
+ wrapper.setup_syslog(execname,
+ config.syslog_log_facility,
+ config.syslog_log_level)
+
# Execute command if it matches any of the loaded filters
- filters = wrapper.load_filters(filters_path)
- filtermatch = wrapper.match_filter(filters, userargs)
- if filtermatch:
- obj = utils.subprocess_popen(filtermatch.get_command(userargs),
- stdin=sys.stdin,
- stdout=sys.stdout,
- stderr=sys.stderr,
- env=filtermatch.get_environment(userargs))
- obj.wait()
- sys.exit(obj.returncode)
-
- print "Unauthorized command: %s" % ' '.join(userargs)
- sys.exit(RC_UNAUTHORIZED)
+ filters = wrapper.load_filters(config.filters_path)
+ try:
+ filtermatch = wrapper.match_filter(filters, userargs,
+ exec_dirs=config.exec_dirs)
+ if filtermatch:
+ command = filtermatch.get_command(userargs,
+ exec_dirs=config.exec_dirs)
+ if config.use_syslog:
+ logging.info("(%s > %s) Executing %s (filter match = %s)" % (
+ os.getlogin(), pwd.getpwuid(os.getuid())[0],
+ command, filtermatch.name))
+
+ obj = subprocess.Popen(command,
+ stdin=sys.stdin,
+ stdout=sys.stdout,
+ stderr=sys.stderr,
+ preexec_fn=_subprocess_setup,
+ env=filtermatch.get_environment(userargs))
+ obj.wait()
+ sys.exit(obj.returncode)
+
+ except wrapper.FilterMatchNotExecutable as exc:
+ msg = ("Executable not found: %s (filter match = %s)"
+ % (exc.match.exec_path, exc.match.name))
+ _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
+
+ except wrapper.NoFilterMatched:
+ msg = ("Unauthorized command: %s (no filter matched)"
+ % ' '.join(userargs))
+ _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
# License for the specific language governing permissions and limitations
# under the License.
-
import os
import re
"""Command filter only checking that the 1st argument matches exec_path."""
def __init__(self, exec_path, run_as, *args):
+ self.name = ''
self.exec_path = exec_path
self.run_as = run_as
self.args = args
+ self.real_exec = None
+
+ def get_exec(self, exec_dirs=[]):
+ """Returns existing executable, or empty string if none found."""
+ if self.real_exec is not None:
+ return self.real_exec
+ self.real_exec = ""
+ if self.exec_path.startswith('/'):
+ if os.access(self.exec_path, os.X_OK):
+ self.real_exec = self.exec_path
+ else:
+ for binary_path in exec_dirs:
+ expanded_path = os.path.join(binary_path, self.exec_path)
+ if os.access(expanded_path, os.X_OK):
+ self.real_exec = expanded_path
+ break
+ return self.real_exec
def match(self, userargs):
"""Only check that the first argument (command) matches exec_path."""
return os.path.basename(self.exec_path) == userargs[0]
- def get_command(self, userargs):
+ def get_command(self, userargs, exec_dirs=[]):
"""Returns command to execute (with sudo -u if run_as != root)."""
+ to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
if (self.run_as != 'root'):
# Used to run commands at lesser privileges
- return ['sudo', '-u', self.run_as, self.exec_path] + userargs[1:]
- return [self.exec_path] + userargs[1:]
+ return ['sudo', '-u', self.run_as, to_exec] + userargs[1:]
+ return [to_exec] + userargs[1:]
def get_environment(self, userargs):
"""Returns specific environment to set, None if none."""
return False
+class PathFilter(CommandFilter):
+ """Command filter checking that path arguments are within given dirs
+
+ One can specify the following constraints for command arguments:
+ 1) pass - pass an argument as is to the resulting command
+ 2) some_str - check if an argument is equal to the given string
+ 3) abs path - check if a path argument is within the given base dir
+
+ A typical rootwrapper filter entry looks like this:
+ # cmdname: filter name, raw command, user, arg_i_constraint [, ...]
+ chown: PathFilter, /bin/chown, root, nova, /var/lib/images
+
+ """
+
+ def match(self, userargs):
+ command, arguments = userargs[0], userargs[1:]
+
+ equal_args_num = len(self.args) == len(arguments)
+ exec_is_valid = super(PathFilter, self).match(userargs)
+ args_equal_or_pass = all(
+ arg == 'pass' or arg == value
+ for arg, value in zip(self.args, arguments)
+ if not os.path.isabs(arg) # arguments not specifying abs paths
+ )
+ paths_are_within_base_dirs = all(
+ os.path.commonprefix([arg, os.path.realpath(value)]) == arg
+ for arg, value in zip(self.args, arguments)
+ if os.path.isabs(arg) # arguments specifying abs paths
+ )
+
+ return (equal_args_num and
+ exec_is_valid and
+ args_equal_or_pass and
+ paths_are_within_base_dirs)
+
+ def get_command(self, userargs, exec_dirs=[]):
+ command, arguments = userargs[0], userargs[1:]
+
+ # convert path values to canonical ones; copy other args as is
+ args = [os.path.realpath(value) if os.path.isabs(arg) else value
+ for arg, value in zip(self.args, arguments)]
+
+ return super(PathFilter, self).get_command([command] + args,
+ exec_dirs)
+
+
class DnsmasqFilter(CommandFilter):
"""Specific filter for the dnsmasq call (which includes env)."""
return True
return False
- def get_command(self, userargs):
- return [self.exec_path] + userargs[3:]
+ def get_command(self, userargs, exec_dirs=[]):
+ to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
+ return [to_exec] + userargs[3:]
def get_environment(self, userargs):
env = os.environ.copy()
return False
args = list(userargs)
if len(args) == 3:
- # this means we're asking for a specific signal
+ # A specific signal is requested
signal = args.pop(1)
if signal not in self.args[1:]:
# Requested signal not in accepted list
if len(self.args) > 1:
# No signal requested, but filter requires specific signal
return False
-
try:
command = os.readlink("/proc/%d/exe" % int(args[1]))
# NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
if command.endswith(" (deleted)"):
command = command[:command.rindex(" ")]
if command != self.args[0]:
- # Affected executable doesn't match
+ # Affected executable does not match
return False
except (ValueError, OSError):
# Incorrect PID
import ConfigParser
+import logging
+import logging.handlers
import os
import string
-# this import has the effect of defining global var "filters",
-# referenced by build_filter(), below. It gets set up by
-# quantum-rootwrap, when we load_filters().
from quantum.rootwrap import filters
+class NoFilterMatched(Exception):
+ """This exception is raised when no filter matched."""
+ pass
+
+
+class FilterMatchNotExecutable(Exception):
+ """
+ This exception is raised when a filter matched but no executable was
+ found.
+ """
+ def __init__(self, match=None, **kwargs):
+ self.match = match
+
+
+class RootwrapConfig(object):
+
+ def __init__(self, config):
+ # filters_path
+ self.filters_path = config.get("DEFAULT", "filters_path").split(",")
+
+ # exec_dirs
+ if config.has_option("DEFAULT", "exec_dirs"):
+ self.exec_dirs = config.get("DEFAULT", "exec_dirs").split(",")
+ else:
+ # Use system PATH if exec_dirs is not specified
+ self.exec_dirs = os.environ["PATH"].split(':')
+
+ # syslog_log_facility
+ if config.has_option("DEFAULT", "syslog_log_facility"):
+ v = config.get("DEFAULT", "syslog_log_facility")
+ facility_names = logging.handlers.SysLogHandler.facility_names
+ self.syslog_log_facility = getattr(logging.handlers.SysLogHandler,
+ v, None)
+ if self.syslog_log_facility is None and v in facility_names:
+ self.syslog_log_facility = facility_names.get(v)
+ if self.syslog_log_facility is None:
+ raise ValueError('Unexpected syslog_log_facility: %s' % v)
+ else:
+ default_facility = logging.handlers.SysLogHandler.LOG_SYSLOG
+ self.syslog_log_facility = default_facility
+
+ # syslog_log_level
+ if config.has_option("DEFAULT", "syslog_log_level"):
+ v = config.get("DEFAULT", "syslog_log_level")
+ self.syslog_log_level = logging.getLevelName(v.upper())
+ if (self.syslog_log_level == "Level %s" % v.upper()):
+ raise ValueError('Unexepected syslog_log_level: %s' % v)
+ else:
+ self.syslog_log_level = logging.ERROR
+
+ # use_syslog
+ if config.has_option("DEFAULT", "use_syslog"):
+ self.use_syslog = config.getboolean("DEFAULT", "use_syslog")
+ else:
+ self.use_syslog = False
+
+
+def setup_syslog(execname, facility, level):
+ rootwrap_logger = logging.getLogger()
+ rootwrap_logger.setLevel(level)
+ handler = logging.handlers.SysLogHandler(address='/dev/log',
+ facility=facility)
+ handler.setFormatter(logging.Formatter(
+ os.path.basename(execname) + ': %(message)s'))
+ rootwrap_logger.addHandler(handler)
+
+
def build_filter(class_name, *args):
"""Returns a filter object of class class_name."""
if not hasattr(filters, class_name):
- # TODO(jrd): Log the error (whenever quantum-rootwrap has a log file)
+ logging.warning("Skipping unknown filter class (%s) specified "
+ "in filter definitions" % class_name)
return None
filterclass = getattr(filters, class_name)
return filterclass(*args)
newfilter = build_filter(*filterdefinition)
if newfilter is None:
continue
+ newfilter.name = name
filterlist.append(newfilter)
return filterlist
-def match_filter(filter_list, userargs):
+def match_filter(filter_list, userargs, exec_dirs=[]):
"""
Checks user command and arguments through command filters and
- returns the first matching filter, or None is none matched.
+ returns the first matching filter.
+ Raises NoFilterMatched if no filter matched.
+ Raises FilterMatchNotExecutable if no executable was found for the
+ best filter match.
"""
-
- found_filter = None
+ first_not_executable_filter = None
for f in filter_list:
if f.match(userargs):
if not isinstance(fltr,
filters.ExecCommandFilter)]
args = f.exec_args(userargs)
- if not args or not match_filter(leaf_filters, args):
+ if (not args or not
+ match_filter(leaf_filters, args, exec_dirs=exec_dirs)):
continue
# Try other filters if executable is absent
- if not os.access(f.exec_path, os.X_OK):
- if not found_filter:
- found_filter = f
+ if not f.get_exec(exec_dirs=exec_dirs):
+ if not first_not_executable_filter:
+ first_not_executable_filter = f
continue
# Otherwise return matching filter for execution
return f
- # No filter matched or first missing executable
- return found_filter
+ if first_not_executable_filter:
+ # A filter matched, but no executable was found for it
+ raise FilterMatchNotExecutable(match=first_not_executable_filter)
+
+ # No filter matched
+ raise NoFilterMatched()
# License for the specific language governing permissions and limitations
# under the License.
+import ConfigParser
+import logging
+import logging.handlers
import os
+import subprocess
+import uuid
-import mock
+import fixtures
-from quantum.common import utils
from quantum.rootwrap import filters
from quantum.rootwrap import wrapper
from quantum.tests import base
filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
- filters.CommandFilter("/nonexistant/cat", "root"),
- filters.CommandFilter("/bin/cat", "root")] # Keep this one last
-
- def tearDown(self):
- super(RootwrapTestCase, self).tearDown()
+ filters.CommandFilter("/nonexistent/cat", "root"),
+ filters.CommandFilter("/bin/cat", "root") # Keep this one last
+ ]
def test_RegExpFilter_match(self):
usercmd = ["ls", "/root"]
def test_RegExpFilter_reject(self):
usercmd = ["ls", "root"]
- filtermatch = wrapper.match_filter(self.filters, usercmd)
- self.assertTrue(filtermatch is None)
+ self.assertRaises(wrapper.NoFilterMatched,
+ wrapper.match_filter, self.filters, usercmd)
def test_missing_command(self):
valid_but_missing = ["foo_bar_not_exist"]
invalid = ["foo_bar_not_exist_and_not_matched"]
- filtermatch = wrapper.match_filter(self.filters, valid_but_missing)
- self.assertTrue(filtermatch is not None)
- filtermatch = wrapper.match_filter(self.filters, invalid)
- self.assertTrue(filtermatch is None)
+ self.assertRaises(wrapper.FilterMatchNotExecutable,
+ wrapper.match_filter,
+ self.filters, valid_but_missing)
+ self.assertRaises(wrapper.NoFilterMatched,
+ wrapper.match_filter, self.filters, invalid)
def test_DnsmasqFilter(self):
usercmd = ['QUANTUM_RELAY_SOCKET_PATH=A', 'QUANTUM_NETWORK_ID=foobar',
self.assertEqual(env.get('QUANTUM_NETWORK_ID'), 'foobar')
def test_KillFilter(self):
- p = utils.subprocess_popen(["/bin/sleep", "5"])
- f = filters.KillFilter("root", "/bin/sleep", "-9", "-HUP")
- f2 = filters.KillFilter("root", "/usr/bin/sleep", "-9", "-HUP")
- usercmd = ['kill', '-ALRM', p.pid]
- # Incorrect signal should fail
- self.assertFalse(f.match(usercmd) or f2.match(usercmd))
- usercmd = ['kill', p.pid]
- # Providing no signal should fail
- self.assertFalse(f.match(usercmd) or f2.match(usercmd))
- # Providing matching signal should be allowed
- usercmd = ['kill', '-9', p.pid]
- self.assertTrue(f.match(usercmd) or f2.match(usercmd))
-
- f = filters.KillFilter("root", "/bin/sleep")
- f2 = filters.KillFilter("root", "/usr/bin/sleep")
- usercmd = ['kill', os.getpid()]
- # Our own PID does not match /bin/sleep, so it should fail
- self.assertFalse(f.match(usercmd) or f2.match(usercmd))
- usercmd = ['kill', 999999]
- # Nonexistant PID should fail
- self.assertFalse(f.match(usercmd) or f2.match(usercmd))
- usercmd = ['kill', p.pid]
- # Providing no signal should work
- self.assertTrue(f.match(usercmd) or f2.match(usercmd))
+ if not os.path.exists("/proc/%d" % os.getpid()):
+ self.skipTest("Test requires /proc filesystem (procfs)")
+ p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ try:
+ f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
+ f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
+ usercmd = ['kill', '-ALRM', p.pid]
+ # Incorrect signal should fail
+ self.assertFalse(f.match(usercmd) or f2.match(usercmd))
+ usercmd = ['kill', p.pid]
+ # Providing no signal should fail
+ self.assertFalse(f.match(usercmd) or f2.match(usercmd))
+ # Providing matching signal should be allowed
+ usercmd = ['kill', '-9', p.pid]
+ self.assertTrue(f.match(usercmd) or f2.match(usercmd))
+
+ f = filters.KillFilter("root", "/bin/cat")
+ f2 = filters.KillFilter("root", "/usr/bin/cat")
+ usercmd = ['kill', os.getpid()]
+ # Our own PID does not match /bin/sleep, so it should fail
+ self.assertFalse(f.match(usercmd) or f2.match(usercmd))
+ usercmd = ['kill', 999999]
+ # Nonexistent PID should fail
+ self.assertFalse(f.match(usercmd) or f2.match(usercmd))
+ usercmd = ['kill', p.pid]
+ # Providing no signal should work
+ self.assertTrue(f.match(usercmd) or f2.match(usercmd))
+ finally:
+ # Terminate the "cat" process and wait for it to finish
+ p.terminate()
+ p.wait()
def test_KillFilter_no_raise(self):
"""Makes sure ValueError from bug 926412 is gone."""
def test_KillFilter_deleted_exe(self):
"""Makes sure deleted exe's are killed correctly."""
- # See bug #1073768.
- with mock.patch('os.readlink') as mock_readlink:
- mock_readlink.return_value = '/bin/commandddddd (deleted)'
- f = filters.KillFilter("root", "/bin/commandddddd")
- usercmd = ['kill', 1234]
- self.assertTrue(f.match(usercmd))
- mock_readlink.assert_called_once_with("/proc/1234/exe")
+ # See bug #967931.
+ def fake_readlink(blah):
+ return '/bin/commandddddd (deleted)'
+
+ f = filters.KillFilter("root", "/bin/commandddddd")
+ usercmd = ['kill', 1234]
+ # Providing no signal should work
+ self.stubs.Set(os, 'readlink', fake_readlink)
+ self.assertTrue(f.match(usercmd))
def test_ReadFileFilter(self):
goodfn = '/good/file.name'
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
'ip', 'link', 'list']
- self.assertIsNone(wrapper.match_filter(filter_list, args))
+ self.assertRaises(wrapper.NoFilterMatched,
+ wrapper.match_filter, filter_list, args)
+
+ def test_exec_dirs_search(self):
+ # This test supposes you have /bin/cat or /usr/bin/cat locally
+ f = filters.CommandFilter("cat", "root")
+ usercmd = ['cat', '/f']
+ self.assertTrue(f.match(usercmd))
+ self.assertTrue(f.get_command(usercmd,
+ exec_dirs=['/bin', '/usr/bin'])
+ in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))
def test_skips(self):
# Check that all filters are skipped and that the last matches
usercmd = ["cat", "/"]
filtermatch = wrapper.match_filter(self.filters, usercmd)
self.assertTrue(filtermatch is self.filters[-1])
+
+ def test_RootwrapConfig(self):
+ raw = ConfigParser.RawConfigParser()
+
+ # Empty config should raise ConfigParser.Error
+ self.assertRaises(ConfigParser.Error, wrapper.RootwrapConfig, raw)
+
+ # Check default values
+ raw.set('DEFAULT', 'filters_path', '/a,/b')
+ config = wrapper.RootwrapConfig(raw)
+ self.assertEqual(config.filters_path, ['/a', '/b'])
+ self.assertEqual(config.exec_dirs, os.environ["PATH"].split(':'))
+ self.assertFalse(config.use_syslog)
+ self.assertEqual(config.syslog_log_facility,
+ logging.handlers.SysLogHandler.LOG_SYSLOG)
+ self.assertEqual(config.syslog_log_level, logging.ERROR)
+
+ # Check general values
+ raw.set('DEFAULT', 'exec_dirs', '/a,/x')
+ config = wrapper.RootwrapConfig(raw)
+ self.assertEqual(config.exec_dirs, ['/a', '/x'])
+
+ raw.set('DEFAULT', 'use_syslog', 'oui')
+ self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
+ raw.set('DEFAULT', 'use_syslog', 'true')
+ config = wrapper.RootwrapConfig(raw)
+ self.assertTrue(config.use_syslog)
+
+ raw.set('DEFAULT', 'syslog_log_facility', 'moo')
+ self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
+ raw.set('DEFAULT', 'syslog_log_facility', 'local0')
+ config = wrapper.RootwrapConfig(raw)
+ self.assertEqual(config.syslog_log_facility,
+ logging.handlers.SysLogHandler.LOG_LOCAL0)
+ raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
+ config = wrapper.RootwrapConfig(raw)
+ self.assertEqual(config.syslog_log_facility,
+ logging.handlers.SysLogHandler.LOG_AUTH)
+
+ raw.set('DEFAULT', 'syslog_log_level', 'bar')
+ self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
+ raw.set('DEFAULT', 'syslog_log_level', 'INFO')
+ config = wrapper.RootwrapConfig(raw)
+ self.assertEqual(config.syslog_log_level, logging.INFO)
+
+
+class PathFilterTestCase(base.BaseTestCase):
+ def setUp(self):
+ super(PathFilterTestCase, self).setUp()
+
+ tmpdir = fixtures.TempDir('/tmp')
+ self.useFixture(tmpdir)
+
+ self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)
+
+ gen_name = lambda: str(uuid.uuid4())
+
+ self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
+ self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join('/tmp', 'some')
+ self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
+ 'some')
+ self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')
+
+ self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
+ gen_name())
+ os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
+ self.TRAVERSAL_SYMLINK_WITHIN_DIR)
+
+ self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
+ gen_name())
+ os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
+ self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)
+
+ self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
+ os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)
+
+ self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
+ os.symlink(os.path.join('/tmp', 'some_file'), self.SYMLINK_OUTSIDE_DIR)
+
+ def test_argument_pass_constraint(self):
+ f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')
+
+ args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
+ self.assertTrue(f.match(args))
+
+ def test_argument_equality_constraint(self):
+ f = filters.PathFilter('/bin/chown', 'root', 'nova', '/tmp/spam/eggs')
+
+ args = ['chown', 'nova', '/tmp/spam/eggs']
+ self.assertTrue(f.match(args))
+
+ args = ['chown', 'quantum', '/tmp/spam/eggs']
+ self.assertFalse(f.match(args))
+
+ def test_wrong_arguments_number(self):
+ args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
+ self.assertFalse(self.f.match(args))
+
+ def test_wrong_exec_command(self):
+ args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
+ self.assertFalse(self.f.match(args))
+
+ def test_match(self):
+ args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
+ self.assertTrue(self.f.match(args))
+
+ def test_match_traversal(self):
+ args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
+ self.assertTrue(self.f.match(args))
+
+ def test_match_symlink(self):
+ args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
+ self.assertTrue(self.f.match(args))
+
+ def test_match_traversal_symlink(self):
+ args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
+ self.assertTrue(self.f.match(args))
+
+ def test_reject(self):
+ args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
+ self.assertFalse(self.f.match(args))
+
+ def test_reject_traversal(self):
+ args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
+ self.assertFalse(self.f.match(args))
+
+ def test_reject_symlink(self):
+ args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
+ self.assertFalse(self.f.match(args))
+
+ def test_reject_traversal_symlink(self):
+ args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
+ self.assertFalse(self.f.match(args))
+
+ def test_get_command(self):
+ args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
+ expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
+
+ self.assertEqual(expected, self.f.get_command(args))
+
+ def test_get_command_traversal(self):
+ args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
+ expected = ['/bin/chown', 'nova',
+ os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]
+
+ self.assertEqual(expected, self.f.get_command(args))
+
+ def test_get_command_symlink(self):
+ args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
+ expected = ['/bin/chown', 'nova',
+ os.path.realpath(self.SYMLINK_WITHIN_DIR)]
+
+ self.assertEqual(expected, self.f.get_command(args))
+
+ def test_get_command_traversal_symlink(self):
+ args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
+ expected = ['/bin/chown', 'nova',
+ os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]
+
+ self.assertEqual(expected, self.f.get_command(args))