if self.real_exec is not None:
return self.real_exec
self.real_exec = ""
- if self.exec_path.startswith('/'):
+ if os.path.isabs(self.exec_path):
if os.access(self.exec_path, os.X_OK):
self.real_exec = self.exec_path
else:
def match(self, userargs):
"""Only check that the first argument (command) matches exec_path."""
- return os.path.basename(self.exec_path) == userargs[0]
+ return userargs and os.path.basename(self.exec_path) == userargs[0]
def get_command(self, userargs, exec_dirs=[]):
"""Returns command to execute (with sudo -u if run_as != root)."""
def match(self, userargs):
# Early skip if command or number of args don't match
- if (len(self.args) != len(userargs)):
+ if (not userargs or len(self.args) != len(userargs)):
# DENY: argument numbers don't match
return False
# Compare each arg (anchoring pattern explicitly at end of string)
"""
def match(self, userargs):
+ if not userargs or len(userargs) < 2:
+ return False
+
command, arguments = userargs[0], userargs[1:]
equal_args_num = len(self.args) == len(arguments)
exec_dirs)
-class DnsmasqFilter(CommandFilter):
- """Specific filter for the dnsmasq call (which includes env)."""
-
- CONFIG_FILE_ARG = 'CONFIG_FILE'
-
- def match(self, userargs):
- if (userargs[0] == 'env' and
- userargs[1].startswith(self.CONFIG_FILE_ARG) and
- userargs[2].startswith('NETWORK_ID=') and
- userargs[3] == 'dnsmasq'):
- return True
- return False
-
- def get_command(self, userargs, exec_dirs=[]):
- to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
- dnsmasq_pos = userargs.index('dnsmasq')
- return [to_exec] + userargs[dnsmasq_pos + 1:]
-
- def get_environment(self, userargs):
- env = os.environ.copy()
- env[self.CONFIG_FILE_ARG] = userargs[1].split('=')[-1]
- env['NETWORK_ID'] = userargs[2].split('=')[-1]
- return env
-
-
-class DeprecatedDnsmasqFilter(DnsmasqFilter):
- """Variant of dnsmasq filter to support old-style FLAGFILE."""
- CONFIG_FILE_ARG = 'FLAGFILE'
-
-
class KillFilter(CommandFilter):
"""Specific filter for the kill calls.
+
1st argument is the user to run /bin/kill under
2nd argument is the location of the affected executable
+ if the argument is not absolute, it is checked against $PATH
Subsequent arguments list the accepted signals (if any)
This filter relies on /proc to accurately determine affected
super(KillFilter, self).__init__("/bin/kill", *args)
def match(self, userargs):
- if userargs[0] != "kill":
+ if not userargs or userargs[0] != "kill":
return False
args = list(userargs)
if len(args) == 3:
return False
try:
command = os.readlink("/proc/%d/exe" % int(args[1]))
- # NOTE(yufang521247): /proc/PID/exe may have '\0' on the
- # end, because python doen't stop at '\0' when read the
- # target path.
- command = command.split('\0')[0]
- # NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
- # the end if an executable is updated or deleted
- if command.endswith(" (deleted)"):
- command = command[:command.rindex(" ")]
- if command != self.args[0]:
- # Affected executable does not match
- return False
except (ValueError, OSError):
# Incorrect PID
return False
- return True
+
+ # NOTE(yufang521247): /proc/PID/exe may have '\0' on the
+ # end, because python doen't stop at '\0' when read the
+ # target path.
+ command = command.partition('\0')[0]
+
+ # NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
+ # the end if an executable is updated or deleted
+ if command.endswith(" (deleted)"):
+ command = command[:-len(" (deleted)")]
+
+ kill_command = self.args[0]
+
+ if os.path.isabs(kill_command):
+ return kill_command == command
+
+ return (os.path.isabs(command) and
+ kill_command == os.path.basename(command) and
+ os.path.dirname(command) in os.environ.get('PATH', ''
+ ).split(':'))
class ReadFileFilter(CommandFilter):
super(ReadFileFilter, self).__init__("/bin/cat", "root", *args)
def match(self, userargs):
- if userargs[0] != 'cat':
- return False
- if userargs[1] != self.file_path:
+ return (userargs == ['cat', self.file_path])
+
+
+class IpFilter(CommandFilter):
+ """Specific filter for the ip utility to that does not match exec."""
+
+ def match(self, userargs):
+ if userargs[0] == 'ip':
+ if userargs[1] == 'netns':
+ return (userargs[2] in ('list', 'add', 'delete'))
+ else:
+ return True
+
+
+class EnvFilter(CommandFilter):
+ """Specific filter for the env utility.
+
+ Behaves like CommandFilter, except that it handles
+ leading env A=B.. strings appropriately.
+ """
+
+ def _extract_env(self, arglist):
+ """Extract all leading NAME=VALUE arguments from arglist."""
+
+ envs = set()
+ for arg in arglist:
+ if '=' not in arg:
+ break
+ envs.add(arg.partition('=')[0])
+ return envs
+
+ def __init__(self, exec_path, run_as, *args):
+ super(EnvFilter, self).__init__(exec_path, run_as, *args)
+
+ env_list = self._extract_env(self.args)
+ # Set exec_path to X when args are in the form of
+ # env A=a B=b C=c X Y Z
+ if "env" in exec_path and len(env_list) < len(self.args):
+ self.exec_path = self.args[len(env_list)]
+
+ def match(self, userargs):
+ # ignore leading 'env'
+ if userargs[0] == 'env':
+ userargs.pop(0)
+
+ # require one additional argument after configured ones
+ if len(userargs) < len(self.args):
return False
- if len(userargs) != 2:
+
+ # extract all env args
+ user_envs = self._extract_env(userargs)
+ filter_envs = self._extract_env(self.args)
+ user_command = userargs[len(user_envs):len(user_envs) + 1]
+
+ # match first non-env argument with CommandFilter
+ return (super(EnvFilter, self).match(user_command)
+ and len(filter_envs) and user_envs == filter_envs)
+
+ def exec_args(self, userargs):
+ args = userargs[:]
+
+ # ignore leading 'env'
+ if args[0] == 'env':
+ args.pop(0)
+
+ # Throw away leading NAME=VALUE arguments
+ while args and '=' in args[0]:
+ args.pop(0)
+
+ return args
+
+ def get_command(self, userargs, exec_dirs=[]):
+ to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
+ return [to_exec] + self.exec_args(userargs)[1:]
+
+ def get_environment(self, userargs):
+ env = os.environ.copy()
+
+ # ignore leading 'env'
+ if userargs[0] == 'env':
+ userargs.pop(0)
+
+ # Handle leading NAME=VALUE pairs
+ for a in userargs:
+ env_name, equals, env_value = a.partition('=')
+ if not equals:
+ break
+ if env_name and env_value:
+ env[env_name] = env_value
+
+ return env
+
+
+class ChainingFilter(CommandFilter):
+ def exec_args(self, userargs):
+ return []
+
+
+class IpNetnsExecFilter(ChainingFilter):
+ """Specific filter for the ip utility to that does match exec."""
+
+ def match(self, userargs):
+ # Network namespaces currently require root
+ # require <ns> argument
+ if self.run_as != "root" or len(userargs) < 4:
return False
- return True
+
+ return (userargs[:3] == ['ip', 'netns', 'exec'])
+
+ def exec_args(self, userargs):
+ args = userargs[4:]
+ if args:
+ args[0] = os.path.basename(args[0])
+ return args