]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Import Oslo's common rootwrap to Neutron
authorThierry Carrez <thierry@openstack.org>
Thu, 27 Jun 2013 13:19:05 +0000 (15:19 +0200)
committerThierry Carrez <thierry@openstack.org>
Mon, 8 Jul 2013 16:25:44 +0000 (18:25 +0200)
Use the common oslo-incubator rootwrap rather than maintain a
specific fork within Neutron.

- Migrated DnsmasqFilter use in dhcp.filters to the new EnvFilter
- Changed environment passing in ip_lib's netns.execute so that
  it can be properly matched using IpNetNsExecFilter + EnvFilter.
  It now calls "ip netns exec ns env A=B C=D command" instead of
  "A=B C=D ip netns exec ns command". Adjusted tests accordingly.

All the other changes are coming directly from the Oslo "rootwrap"
module sync.

Notes:
- Neutron locates its rootwrap.conf in etc/ rather than in etc/neutron
- Neutron maintains a specific bin/quantum-rootwrap-xen-dom0 which
  requires additional config in rootwrap.conf

Both behaviors were preserved in this commit, but this may need to be
addressed in the future to simplify future oslo-rootwrap updates.

Implements bp: quantum-common-rootwrap

Change-Id: I02879942a9d1169a71aa4d684c1b9ec109a6de32

13 files changed:
bin/neutron-rootwrap
bin/quantum-rootwrap
bin/quantum-rootwrap-xen-dom0
etc/neutron/rootwrap.d/dhcp.filters
etc/rootwrap.conf
neutron/agent/linux/ip_lib.py
neutron/openstack/common/rootwrap/__init__.py [moved from neutron/rootwrap/__init__.py with 93% similarity]
neutron/openstack/common/rootwrap/cmd.py [new file with mode: 0755]
neutron/openstack/common/rootwrap/filters.py [moved from neutron/rootwrap/filters.py with 65% similarity]
neutron/openstack/common/rootwrap/wrapper.py [moved from neutron/rootwrap/wrapper.py with 89% similarity]
neutron/tests/unit/test_linux_dhcp.py
neutron/tests/unit/test_linux_ip_lib.py
neutron/tests/unit/test_rootwrap.py [deleted file]

index 73ec76d91c7425d1d52172d863da79f851d4e761..ccd5d93a407e0cf8f73d8702c8893a3121c23519 100755 (executable)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-"""Root wrapper for Neutron
+from neutron.openstack.common.rootwrap import cmd
 
-   Filters which commands neutron is allowed to run as another user.
-
-   To use this, you should set the following in neutron.conf and the
-   various .ini files for the agent plugins:
-   root_helper=sudo neutron-rootwrap /etc/neutron/rootwrap.conf
-
-   You also need to let the neutron user run neutron-rootwrap as root in
-     /etc/sudoers:
-   neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
-                                  /etc/neutron/rootwrap.conf *
-
-   Filter specs live in /etc/neutron/rootwrap.d/*.filters, or
-   other locations pointed to by /etc/neutron/rootwrap.conf.
-   To make allowed commands node-specific, your packaging should only
-   install apropriate .filters for commands which are needed on each
-   node.
-"""
-
-from __future__ import print_function
-
-import ConfigParser
-import logging
-import os
-import pwd
-import signal
-import subprocess
-import sys
-
-
-RC_UNAUTHORIZED = 99
-RC_NOCOMMAND = 98
-RC_BADCONFIG = 97
-RC_NOEXECFOUND = 96
-
-
-def _subprocess_setup():
-    # Python installs a SIGPIPE handler by default. This is usually not what
-    # non-Python subprocesses expect.
-    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
-
-
-def _exit_error(execname, message, errorcode, log=True):
-    print("%s: %s" % (execname, message))
-    if log:
-        logging.error(message)
-    sys.exit(errorcode)
-
-
-if __name__ == '__main__':
-    # Split arguments, require at least a command
-    execname = sys.argv.pop(0)
-    if len(sys.argv) < 2:
-        _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
-
-    configfile = sys.argv.pop(0)
-    userargs = sys.argv[:]
-
-    # Add ../ to sys.path to allow running from branch
-    possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
-                                                    os.pardir, os.pardir))
-    if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
-        sys.path.insert(0, possible_topdir)
-
-    from neutron.rootwrap import wrapper
-
-    # Load configuration
-    try:
-        rawconfig = ConfigParser.RawConfigParser()
-        rawconfig.read(configfile)
-        config = wrapper.RootwrapConfig(rawconfig)
-    except ValueError as exc:
-        msg = "Incorrect value in %s: %s" % (configfile, exc.message)
-        _exit_error(execname, msg, RC_BADCONFIG, log=False)
-    except ConfigParser.Error:
-        _exit_error(execname, "Incorrect configuration file: %s" % configfile,
-                    RC_BADCONFIG, log=False)
-
-    if config.use_syslog:
-        wrapper.setup_syslog(execname,
-                             config.syslog_log_facility,
-                             config.syslog_log_level)
-
-    # Execute command if it matches any of the loaded filters
-    filters = wrapper.load_filters(config.filters_path)
-    try:
-        filtermatch = wrapper.match_filter(filters, userargs,
-                                           exec_dirs=config.exec_dirs)
-        if filtermatch:
-            command = filtermatch.get_command(userargs,
-                                              exec_dirs=config.exec_dirs)
-            if config.use_syslog:
-                logging.info("(%s > %s) Executing %s (filter match = %s)" % (
-                    os.getlogin(), pwd.getpwuid(os.getuid())[0],
-                    command, filtermatch.name))
-
-            obj = subprocess.Popen(command,
-                                   stdin=sys.stdin,
-                                   stdout=sys.stdout,
-                                   stderr=sys.stderr,
-                                   preexec_fn=_subprocess_setup,
-                                   env=filtermatch.get_environment(userargs))
-            obj.wait()
-            sys.exit(obj.returncode)
-
-    except wrapper.FilterMatchNotExecutable as exc:
-        msg = ("Executable not found: %s (filter match = %s)"
-               % (exc.match.exec_path, exc.match.name))
-        _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
-
-    except wrapper.NoFilterMatched:
-        msg = ("Unauthorized command: %s (no filter matched)"
-               % ' '.join(userargs))
-        _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
+cmd.main()
index 73ec76d91c7425d1d52172d863da79f851d4e761..ccd5d93a407e0cf8f73d8702c8893a3121c23519 100755 (executable)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-"""Root wrapper for Neutron
+from neutron.openstack.common.rootwrap import cmd
 
-   Filters which commands neutron is allowed to run as another user.
-
-   To use this, you should set the following in neutron.conf and the
-   various .ini files for the agent plugins:
-   root_helper=sudo neutron-rootwrap /etc/neutron/rootwrap.conf
-
-   You also need to let the neutron user run neutron-rootwrap as root in
-     /etc/sudoers:
-   neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
-                                  /etc/neutron/rootwrap.conf *
-
-   Filter specs live in /etc/neutron/rootwrap.d/*.filters, or
-   other locations pointed to by /etc/neutron/rootwrap.conf.
-   To make allowed commands node-specific, your packaging should only
-   install apropriate .filters for commands which are needed on each
-   node.
-"""
-
-from __future__ import print_function
-
-import ConfigParser
-import logging
-import os
-import pwd
-import signal
-import subprocess
-import sys
-
-
-RC_UNAUTHORIZED = 99
-RC_NOCOMMAND = 98
-RC_BADCONFIG = 97
-RC_NOEXECFOUND = 96
-
-
-def _subprocess_setup():
-    # Python installs a SIGPIPE handler by default. This is usually not what
-    # non-Python subprocesses expect.
-    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
-
-
-def _exit_error(execname, message, errorcode, log=True):
-    print("%s: %s" % (execname, message))
-    if log:
-        logging.error(message)
-    sys.exit(errorcode)
-
-
-if __name__ == '__main__':
-    # Split arguments, require at least a command
-    execname = sys.argv.pop(0)
-    if len(sys.argv) < 2:
-        _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
-
-    configfile = sys.argv.pop(0)
-    userargs = sys.argv[:]
-
-    # Add ../ to sys.path to allow running from branch
-    possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
-                                                    os.pardir, os.pardir))
-    if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
-        sys.path.insert(0, possible_topdir)
-
-    from neutron.rootwrap import wrapper
-
-    # Load configuration
-    try:
-        rawconfig = ConfigParser.RawConfigParser()
-        rawconfig.read(configfile)
-        config = wrapper.RootwrapConfig(rawconfig)
-    except ValueError as exc:
-        msg = "Incorrect value in %s: %s" % (configfile, exc.message)
-        _exit_error(execname, msg, RC_BADCONFIG, log=False)
-    except ConfigParser.Error:
-        _exit_error(execname, "Incorrect configuration file: %s" % configfile,
-                    RC_BADCONFIG, log=False)
-
-    if config.use_syslog:
-        wrapper.setup_syslog(execname,
-                             config.syslog_log_facility,
-                             config.syslog_log_level)
-
-    # Execute command if it matches any of the loaded filters
-    filters = wrapper.load_filters(config.filters_path)
-    try:
-        filtermatch = wrapper.match_filter(filters, userargs,
-                                           exec_dirs=config.exec_dirs)
-        if filtermatch:
-            command = filtermatch.get_command(userargs,
-                                              exec_dirs=config.exec_dirs)
-            if config.use_syslog:
-                logging.info("(%s > %s) Executing %s (filter match = %s)" % (
-                    os.getlogin(), pwd.getpwuid(os.getuid())[0],
-                    command, filtermatch.name))
-
-            obj = subprocess.Popen(command,
-                                   stdin=sys.stdin,
-                                   stdout=sys.stdout,
-                                   stderr=sys.stderr,
-                                   preexec_fn=_subprocess_setup,
-                                   env=filtermatch.get_environment(userargs))
-            obj.wait()
-            sys.exit(obj.returncode)
-
-    except wrapper.FilterMatchNotExecutable as exc:
-        msg = ("Executable not found: %s (filter match = %s)"
-               % (exc.match.exec_path, exc.match.name))
-        _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
-
-    except wrapper.NoFilterMatched:
-        msg = ("Unauthorized command: %s (no filter matched)"
-               % ' '.join(userargs))
-        _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
+cmd.main()
index 062fd216414c66e450fae3764d428c5d496e2bdb..7b5ba21e520e7ac6a6031f8f0c41a6f54c35992c 100755 (executable)
@@ -95,7 +95,7 @@ def filter_command(exec_name, filters_path, user_args, exec_dirs):
     if os.path.exists(os.path.join(possible_topdir, "quantum", "__init__.py")):
         sys.path.insert(0, possible_topdir)
 
-    from quantum.rootwrap import wrapper
+    from neutron.openstack.common.rootwrap import wrapper
 
     # Execute command if it matches any of the loaded filters
     filters = wrapper.load_filters(filters_path)
index af55f521e8d7a505d29fe20becd58442aff625ef..e615ddb9a39498e24249948d814554f2e2fd785c 100644 (file)
@@ -9,9 +9,7 @@
 [Filters]
 
 # dhcp-agent
-ip_exec_dnsmasq: DnsmasqNetnsFilter, ip, root
-dnsmasq: DnsmasqFilter, /sbin/dnsmasq, root
-dnsmasq_usr: DnsmasqFilter, /usr/sbin/dnsmasq, root
+dnsmasq: EnvFilter, dnsmasq, root, NEUTRON_RELAY_SOCKET_PATH=, NEUTRON_NETWORK_ID=
 # dhcp-agent uses kill as well, that's handled by the generic KillFilter
 # it looks like these are the only signals needed, per
 # neutron/agent/linux/dhcp.py
index 2169edfa16e838e67d95c267107a3bfab2d67aac..77521d324e8c5725603e68c24558a61caf00c290 100644 (file)
@@ -1,3 +1,6 @@
+# Configuration for neutron-rootwrap
+# This file should be owned by (and only-writeable by) the root user
+
 [DEFAULT]
 # List of directories to load filter definitions from (separated by ',').
 # These directories MUST all be only writeable by root !
@@ -9,6 +12,20 @@ filters_path=/etc/quantum/rootwrap.d,/usr/share/quantum/rootwrap
 # These directories MUST all be only writeable by root !
 exec_dirs=/sbin,/usr/sbin,/bin,/usr/bin
 
+# Enable logging to syslog
+# Default value is False
+use_syslog=False
+
+# Which syslog facility to use.
+# Valid values include auth, authpriv, syslog, user0, user1...
+# Default value is 'syslog'
+syslog_log_facility=syslog
+
+# Which messages to log.
+# INFO means log all usage
+# ERROR means only log unsuccessful attempts
+syslog_log_level=ERROR
+
 [xenapi]
 # XenAPI configuration is only required by the L2 agent if it is to
 # target a XenServer/XCP compute host's dom0.
index 8590aa20f982420033bb358aee4dc7f24acbf15c..3eadbfbb9a78e564f5e7dd3a204f361b4497fe66 100644 (file)
@@ -424,9 +424,13 @@ class IpNetnsCommand(IpCommandBase):
         elif not self._parent.namespace:
             raise Exception(_('No namespace defined for parent'))
         else:
+            env_params = []
+            if addl_env:
+                env_params = (['env'] +
+                              ['%s=%s' % pair for pair in addl_env.items()])
             return utils.execute(
-                ['%s=%s' % pair for pair in addl_env.items()] +
-                ['ip', 'netns', 'exec', self._parent.namespace] + list(cmds),
+                ['ip', 'netns', 'exec', self._parent.namespace] +
+                env_params + list(cmds),
                 root_helper=self._parent.root_helper,
                 check_exit_code=check_exit_code)
 
similarity index 93%
rename from neutron/rootwrap/__init__.py
rename to neutron/openstack/common/rootwrap/__init__.py
index d5943e88553f55f29655514151dfec719d8020d3..2d32e4ef3182127d96c73681c4788f2fd5ed5e54 100644 (file)
@@ -1,6 +1,6 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 
-# Copyright (c) 2012 OpenStack Foundation.
+# Copyright (c) 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
diff --git a/neutron/openstack/common/rootwrap/cmd.py b/neutron/openstack/common/rootwrap/cmd.py
new file mode 100755 (executable)
index 0000000..3ac8c5b
--- /dev/null
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Root wrapper for OpenStack services
+
+   Filters which commands a service is allowed to run as another user.
+
+   To use this with neutron, you should set the following in
+   neutron.conf:
+   rootwrap_config=/etc/neutron/rootwrap.conf
+
+   You also need to let the neutron user run neutron-rootwrap
+   as root in sudoers:
+   neutron ALL = (root) NOPASSWD: /usr/bin/neutron-rootwrap
+                                   /etc/neutron/rootwrap.conf *
+
+   Service packaging should deploy .filters files only on nodes where
+   they are needed, to avoid allowing more than is necessary.
+"""
+
+from __future__ import print_function
+
+import ConfigParser
+import logging
+import os
+import pwd
+import signal
+import subprocess
+import sys
+
+
+RC_UNAUTHORIZED = 99
+RC_NOCOMMAND = 98
+RC_BADCONFIG = 97
+RC_NOEXECFOUND = 96
+
+
+def _subprocess_setup():
+    # Python installs a SIGPIPE handler by default. This is usually not what
+    # non-Python subprocesses expect.
+    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+
+def _exit_error(execname, message, errorcode, log=True):
+    print("%s: %s" % (execname, message))
+    if log:
+        logging.error(message)
+    sys.exit(errorcode)
+
+
+def main():
+    # Split arguments, require at least a command
+    execname = sys.argv.pop(0)
+    if len(sys.argv) < 2:
+        _exit_error(execname, "No command specified", RC_NOCOMMAND, log=False)
+
+    configfile = sys.argv.pop(0)
+    userargs = sys.argv[:]
+
+    # Add ../ to sys.path to allow running from branch
+    possible_topdir = os.path.normpath(os.path.join(os.path.abspath(execname),
+                                                    os.pardir, os.pardir))
+    if os.path.exists(os.path.join(possible_topdir, "neutron", "__init__.py")):
+        sys.path.insert(0, possible_topdir)
+
+    from neutron.openstack.common.rootwrap import wrapper
+
+    # Load configuration
+    try:
+        rawconfig = ConfigParser.RawConfigParser()
+        rawconfig.read(configfile)
+        config = wrapper.RootwrapConfig(rawconfig)
+    except ValueError as exc:
+        msg = "Incorrect value in %s: %s" % (configfile, exc.message)
+        _exit_error(execname, msg, RC_BADCONFIG, log=False)
+    except ConfigParser.Error:
+        _exit_error(execname, "Incorrect configuration file: %s" % configfile,
+                    RC_BADCONFIG, log=False)
+
+    if config.use_syslog:
+        wrapper.setup_syslog(execname,
+                             config.syslog_log_facility,
+                             config.syslog_log_level)
+
+    # Execute command if it matches any of the loaded filters
+    filters = wrapper.load_filters(config.filters_path)
+    try:
+        filtermatch = wrapper.match_filter(filters, userargs,
+                                           exec_dirs=config.exec_dirs)
+        if filtermatch:
+            command = filtermatch.get_command(userargs,
+                                              exec_dirs=config.exec_dirs)
+            if config.use_syslog:
+                logging.info("(%s > %s) Executing %s (filter match = %s)" % (
+                    os.getlogin(), pwd.getpwuid(os.getuid())[0],
+                    command, filtermatch.name))
+
+            obj = subprocess.Popen(command,
+                                   stdin=sys.stdin,
+                                   stdout=sys.stdout,
+                                   stderr=sys.stderr,
+                                   preexec_fn=_subprocess_setup,
+                                   env=filtermatch.get_environment(userargs))
+            obj.wait()
+            sys.exit(obj.returncode)
+
+    except wrapper.FilterMatchNotExecutable as exc:
+        msg = ("Executable not found: %s (filter match = %s)"
+               % (exc.match.exec_path, exc.match.name))
+        _exit_error(execname, msg, RC_NOEXECFOUND, log=config.use_syslog)
+
+    except wrapper.NoFilterMatched:
+        msg = ("Unauthorized command: %s (no filter matched)"
+               % ' '.join(userargs))
+        _exit_error(execname, msg, RC_UNAUTHORIZED, log=config.use_syslog)
similarity index 65%
rename from neutron/rootwrap/filters.py
rename to neutron/openstack/common/rootwrap/filters.py
index e14f2c8bd2da2371ece7d4685bb424797bc2e823..dfec41224334bba867a4f9bd661ac05b40f71ee7 100644 (file)
@@ -1,6 +1,6 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 
-# Copyright (c) 2012 OpenStack Foundation.
+# Copyright (c) 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -34,7 +34,7 @@ class CommandFilter(object):
         if self.real_exec is not None:
             return self.real_exec
         self.real_exec = ""
-        if self.exec_path.startswith('/'):
+        if os.path.isabs(self.exec_path):
             if os.access(self.exec_path, os.X_OK):
                 self.real_exec = self.exec_path
         else:
@@ -62,11 +62,6 @@ class CommandFilter(object):
         return None
 
 
-class ExecCommandFilter(CommandFilter):
-    def exec_args(self, userargs):
-        return []
-
-
 class RegExpFilter(CommandFilter):
     """Command filter doing regexp matching for every argument."""
 
@@ -140,62 +135,39 @@ class PathFilter(CommandFilter):
 class DnsmasqFilter(CommandFilter):
     """Specific filter for the dnsmasq call (which includes env)."""
 
-    def is_dnsmasq_cmd(self, argv):
-        if (argv[0] == "dnsmasq"):
-            return True
-        return False
-
-    def is_dnsmasq_env_vars(self, argv):
-        if (argv[0].startswith("NEUTRON_RELAY_SOCKET_PATH=") and
-            argv[1].startswith("NEUTRON_NETWORK_ID=")):
-            return True
-        return False
+    CONFIG_FILE_ARG = 'CONFIG_FILE'
 
     def match(self, userargs):
-        """This matches the combination of the leading env
-        vars plus "dnsmasq"
-        """
-        if (self.is_dnsmasq_env_vars(userargs) and
-            self.is_dnsmasq_cmd(userargs[2:])):
+        if (userargs[0] == 'env' and
+                userargs[1].startswith(self.CONFIG_FILE_ARG) and
+                userargs[2].startswith('NETWORK_ID=') and
+                userargs[3] == 'dnsmasq'):
             return True
         return False
 
     def get_command(self, userargs, exec_dirs=[]):
         to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
-        return [to_exec] + userargs[3:]
+        dnsmasq_pos = userargs.index('dnsmasq')
+        return [to_exec] + userargs[dnsmasq_pos + 1:]
 
     def get_environment(self, userargs):
         env = os.environ.copy()
-        env['NEUTRON_RELAY_SOCKET_PATH'] = userargs[0].split('=')[-1]
-        env['NEUTRON_NETWORK_ID'] = userargs[1].split('=')[-1]
+        env[self.CONFIG_FILE_ARG] = userargs[1].split('=')[-1]
+        env['NETWORK_ID'] = userargs[2].split('=')[-1]
         return env
 
 
-class DnsmasqNetnsFilter(DnsmasqFilter):
-    """Specific filter for the dnsmasq call (which includes env)."""
-
-    def is_ip_netns_cmd(self, argv):
-        if ((argv[0] == "ip") and
-            (argv[1] == "netns") and
-            (argv[2] == "exec")):
-            return True
-        return False
-
-    def match(self, userargs):
-        """This matches the combination of the leading env
-        vars plus "ip" "netns" "exec" <foo> "dnsmasq"
-        """
-        if (self.is_dnsmasq_env_vars(userargs) and
-            self.is_ip_netns_cmd(userargs[2:]) and
-            self.is_dnsmasq_cmd(userargs[6:])):
-            return True
-        return False
+class DeprecatedDnsmasqFilter(DnsmasqFilter):
+    """Variant of dnsmasq filter to support old-style FLAGFILE."""
+    CONFIG_FILE_ARG = 'FLAGFILE'
 
 
 class KillFilter(CommandFilter):
     """Specific filter for the kill calls.
+
        1st argument is the user to run /bin/kill under
        2nd argument is the location of the affected executable
+           if the argument is not absolute, it is checked against $PATH
        Subsequent arguments list the accepted signals (if any)
 
        This filter relies on /proc to accurately determine affected
@@ -224,17 +196,28 @@ class KillFilter(CommandFilter):
                 return False
         try:
             command = os.readlink("/proc/%d/exe" % int(args[1]))
-            # NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
-            # the end if an executable is updated or deleted
-            if command.endswith(" (deleted)"):
-                command = command[:command.rindex(" ")]
-            if command != self.args[0]:
-                # Affected executable does not match
-                return False
         except (ValueError, OSError):
             # Incorrect PID
             return False
-        return True
+
+        # NOTE(yufang521247): /proc/PID/exe may have '\0' on the
+        # end, because python doen't stop at '\0' when read the
+        # target path.
+        command = command.partition('\0')[0]
+
+        # NOTE(dprince): /proc/PID/exe may have ' (deleted)' on
+        # the end if an executable is updated or deleted
+        if command.endswith(" (deleted)"):
+            command = command[:-len(" (deleted)")]
+
+        kill_command = self.args[0]
+
+        if os.path.isabs(kill_command):
+            return kill_command == command
+
+        return (os.path.isabs(command) and
+                kill_command == os.path.basename(command) and
+                os.path.dirname(command) in os.environ['PATH'].split(':'))
 
 
 class ReadFileFilter(CommandFilter):
@@ -260,22 +243,106 @@ class IpFilter(CommandFilter):
     def match(self, userargs):
         if userargs[0] == 'ip':
             if userargs[1] == 'netns':
-                if userargs[2] in ('list', 'add', 'delete'):
-                    return True
-                else:
-                    return False
+                return (userargs[2] in ('list', 'add', 'delete'))
             else:
                 return True
 
 
-class IpNetnsExecFilter(ExecCommandFilter):
+class EnvFilter(CommandFilter):
+    """Specific filter for the env utility.
+
+    Behaves like CommandFilter, except that it handles
+    leading env A=B.. strings appropriately.
+    """
+
+    def _extract_env(self, arglist):
+        """Extract all leading NAME=VALUE arguments from arglist."""
+
+        envs = set()
+        for arg in arglist:
+            if '=' not in arg:
+                break
+            envs.add(arg.partition('=')[0])
+        return envs
+
+    def __init__(self, exec_path, run_as, *args):
+        super(EnvFilter, self).__init__(exec_path, run_as, *args)
+
+        env_list = self._extract_env(self.args)
+        # Set exec_path to X when args are in the form of
+        # env A=a B=b C=c X Y Z
+        if "env" in exec_path and len(env_list) < len(self.args):
+            self.exec_path = self.args[len(env_list)]
+
+    def match(self, userargs):
+        # ignore leading 'env'
+        if userargs[0] == 'env':
+            userargs.pop(0)
+
+        # require one additional argument after configured ones
+        if len(userargs) < len(self.args):
+            return False
+
+        # extract all env args
+        user_envs = self._extract_env(userargs)
+        filter_envs = self._extract_env(self.args)
+        user_command = userargs[len(user_envs):len(user_envs) + 1]
+
+        # match first non-env argument with CommandFilter
+        return (super(EnvFilter, self).match(user_command)
+                and len(filter_envs) and user_envs == filter_envs)
+
+    def exec_args(self, userargs):
+        args = userargs[:]
+
+        # ignore leading 'env'
+        if args[0] == 'env':
+            args.pop(0)
+
+        # Throw away leading NAME=VALUE arguments
+        while args and '=' in args[0]:
+            args.pop(0)
+
+        return args
+
+    def get_command(self, userargs, exec_dirs=[]):
+        to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
+        return [to_exec] + self.exec_args(userargs)[1:]
+
+    def get_environment(self, userargs):
+        env = os.environ.copy()
+
+        # ignore leading 'env'
+        if userargs[0] == 'env':
+            userargs.pop(0)
+
+        # Handle leading NAME=VALUE pairs
+        for a in userargs:
+            env_name, equals, env_value = a.partition('=')
+            if not equals:
+                break
+            if env_name and env_value:
+                env[env_name] = env_value
+
+        return env
+
+
+class ChainingFilter(CommandFilter):
+    def exec_args(self, userargs):
+        return []
+
+
+class IpNetnsExecFilter(ChainingFilter):
     """Specific filter for the ip utility to that does match exec."""
+
     def match(self, userargs):
-        if userargs[:3] == ['ip', 'netns', 'exec']:
-            return True
-        else:
+        # Network namespaces currently require root
+        # require <ns> argument
+        if self.run_as != "root" or len(userargs) < 4:
             return False
 
+        return (userargs[:3] == ['ip', 'netns', 'exec'])
+
     def exec_args(self, userargs):
         args = userargs[4:]
         if args:
similarity index 89%
rename from neutron/rootwrap/wrapper.py
rename to neutron/openstack/common/rootwrap/wrapper.py
index 24977f9af6a789a08a3572a960cc11ca79b7037f..fa54ad8d46c53777ee598d932bf694e2cb7099bf 100644 (file)
@@ -1,6 +1,6 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 
-# Copyright (c) 2012 OpenStack Foundation.
+# Copyright (c) 2011 OpenStack Foundation.
 # All Rights Reserved.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -22,7 +22,7 @@ import logging.handlers
 import os
 import string
 
-from neutron.rootwrap import filters
+from neutron.openstack.common.rootwrap import filters
 
 
 class NoFilterMatched(Exception):
@@ -119,10 +119,10 @@ def load_filters(filters_path):
 
 
 def match_filter(filter_list, userargs, exec_dirs=[]):
-    """Return first matched filter from command filters.
+    """Checks user command and arguments through command filters.
+
+    Returns the first matching filter.
 
-    Checks user command and arguments through command filters and
-    returns the first matching filter.
     Raises NoFilterMatched if no filter matched.
     Raises FilterMatchNotExecutable if no executable was found for the
     best filter match.
@@ -131,15 +131,18 @@ def match_filter(filter_list, userargs, exec_dirs=[]):
 
     for f in filter_list:
         if f.match(userargs):
-            if isinstance(f, filters.ExecCommandFilter):
+            if isinstance(f, filters.ChainingFilter):
                 # This command calls exec verify that remaining args
                 # matches another filter.
+                def non_chain_filter(fltr):
+                    return (fltr.run_as == f.run_as
+                            and not isinstance(fltr, filters.ChainingFilter))
+
                 leaf_filters = [fltr for fltr in filter_list
-                                if not isinstance(fltr,
-                                                  filters.ExecCommandFilter)]
+                                if non_chain_filter(fltr)]
                 args = f.exec_args(userargs)
-                if (not args or not
-                    match_filter(leaf_filters, args, exec_dirs=exec_dirs)):
+                if (not args or not match_filter(leaf_filters,
+                                                 args, exec_dirs=exec_dirs)):
                     continue
 
             # Try other filters if executable is absent
index 4296db353dfafd6d515c6f60244dd57716ba7065..ab2815e4253924dc1e8e3c9a48c4f2e813663621 100644 (file)
@@ -428,12 +428,13 @@ class TestDnsmasq(TestBase):
                 raise IndexError
 
         expected = [
-            'NEUTRON_RELAY_SOCKET_PATH=/dhcp/lease_relay',
-            'NEUTRON_NETWORK_ID=cccccccc-cccc-cccc-cccc-cccccccccccc',
             'ip',
             'netns',
             'exec',
             'qdhcp-ns',
+            'env',
+            'NEUTRON_RELAY_SOCKET_PATH=/dhcp/lease_relay',
+            'NEUTRON_NETWORK_ID=cccccccc-cccc-cccc-cccc-cccccccccccc',
             'dnsmasq',
             '--no-hosts',
             '--no-resolv',
index 4d3266b8f5f23b6090c8c8246ee69607828a3c83..b186dbbe337052446baf40c6ac717350f8f503c2 100644 (file)
@@ -670,8 +670,8 @@ class TestIpNetnsCommand(TestIPCmdBase):
             env = dict(FOO=1, BAR=2)
             self.netns_cmd.execute(['ip', 'link', 'list'], env)
             execute.assert_called_once_with(
-                ['FOO=1', 'BAR=2', 'ip', 'netns', 'exec', 'ns', 'ip', 'link',
-                 'list'],
+                ['ip', 'netns', 'exec', 'ns', 'env', 'FOO=1', 'BAR=2',
+                 'ip', 'link', 'list'],
                 root_helper='sudo', check_exit_code=True)
 
 
diff --git a/neutron/tests/unit/test_rootwrap.py b/neutron/tests/unit/test_rootwrap.py
deleted file mode 100644 (file)
index bc5fc25..0000000
+++ /dev/null
@@ -1,366 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-#    Copyright 2011 OpenStack Foundation
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import ConfigParser
-import logging
-import logging.handlers
-import os
-import subprocess
-import uuid
-
-import fixtures
-
-from neutron.rootwrap import filters
-from neutron.rootwrap import wrapper
-from neutron.tests import base
-
-
-class RootwrapTestCase(base.BaseTestCase):
-
-    def setUp(self):
-        super(RootwrapTestCase, self).setUp()
-        self.filters = [
-            filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
-            filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
-            filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
-            filters.CommandFilter("/nonexistent/cat", "root"),
-            filters.CommandFilter("/bin/cat", "root")  # Keep this one last
-        ]
-
-    def test_RegExpFilter_match(self):
-        usercmd = ["ls", "/root"]
-        filtermatch = wrapper.match_filter(self.filters, usercmd)
-        self.assertFalse(filtermatch is None)
-        self.assertEqual(filtermatch.get_command(usercmd),
-                         ["/bin/ls", "/root"])
-
-    def test_RegExpFilter_reject(self):
-        usercmd = ["ls", "root"]
-        self.assertRaises(wrapper.NoFilterMatched,
-                          wrapper.match_filter, self.filters, usercmd)
-
-    def test_missing_command(self):
-        valid_but_missing = ["foo_bar_not_exist"]
-        invalid = ["foo_bar_not_exist_and_not_matched"]
-        self.assertRaises(wrapper.FilterMatchNotExecutable,
-                          wrapper.match_filter,
-                          self.filters, valid_but_missing)
-        self.assertRaises(wrapper.NoFilterMatched,
-                          wrapper.match_filter, self.filters, invalid)
-
-    def test_DnsmasqFilter(self):
-        usercmd = ['NEUTRON_RELAY_SOCKET_PATH=A', 'NEUTRON_NETWORK_ID=foobar',
-                   'dnsmasq', 'foo']
-        f = filters.DnsmasqFilter("/usr/bin/dnsmasq", "root")
-        self.assertTrue(f.match(usercmd))
-        self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo'])
-        env = f.get_environment(usercmd)
-        self.assertEqual(env.get('NEUTRON_RELAY_SOCKET_PATH'), 'A')
-        self.assertEqual(env.get('NEUTRON_NETWORK_ID'), 'foobar')
-
-    def test_DnsmasqNetnsFilter(self):
-        usercmd = ['NEUTRON_RELAY_SOCKET_PATH=A', 'NEUTRON_NETWORK_ID=foobar',
-                   'ip', 'netns', 'exec', 'foo', 'dnsmasq', 'foo']
-        f = filters.DnsmasqNetnsFilter("/sbin/ip", "root")
-        self.assertTrue(f.match(usercmd))
-        self.assertEqual(f.get_command(usercmd), ['/sbin/ip', 'netns', 'exec',
-                                                  'foo', 'dnsmasq', 'foo'])
-        env = f.get_environment(usercmd)
-        self.assertEqual(env.get('NEUTRON_RELAY_SOCKET_PATH'), 'A')
-        self.assertEqual(env.get('NEUTRON_NETWORK_ID'), 'foobar')
-
-    def test_KillFilter(self):
-        if not os.path.exists("/proc/%d" % os.getpid()):
-            self.skipTest("Test requires /proc filesystem (procfs)")
-        p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
-                             stdout=subprocess.PIPE,
-                             stderr=subprocess.STDOUT)
-        try:
-            f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
-            f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
-            usercmd = ['kill', '-ALRM', p.pid]
-            # Incorrect signal should fail
-            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
-            usercmd = ['kill', p.pid]
-            # Providing no signal should fail
-            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
-            # Providing matching signal should be allowed
-            usercmd = ['kill', '-9', p.pid]
-            self.assertTrue(f.match(usercmd) or f2.match(usercmd))
-
-            f = filters.KillFilter("root", "/bin/cat")
-            f2 = filters.KillFilter("root", "/usr/bin/cat")
-            usercmd = ['kill', os.getpid()]
-            # Our own PID does not match /bin/sleep, so it should fail
-            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
-            usercmd = ['kill', 999999]
-            # Nonexistent PID should fail
-            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
-            usercmd = ['kill', p.pid]
-            # Providing no signal should work
-            self.assertTrue(f.match(usercmd) or f2.match(usercmd))
-        finally:
-            # Terminate the "cat" process and wait for it to finish
-            p.terminate()
-            p.wait()
-
-    def test_KillFilter_no_raise(self):
-        """Makes sure ValueError from bug 926412 is gone."""
-        f = filters.KillFilter("root", "")
-        # Providing anything other than kill should be False
-        usercmd = ['notkill', 999999]
-        self.assertFalse(f.match(usercmd))
-        # Providing something that is not a pid should be False
-        usercmd = ['kill', 'notapid']
-        self.assertFalse(f.match(usercmd))
-
-    def test_KillFilter_deleted_exe(self):
-        """Makes sure deleted exe's are killed correctly."""
-        # See bug #967931.
-        def fake_readlink(blah):
-            return '/bin/commandddddd (deleted)'
-
-        f = filters.KillFilter("root", "/bin/commandddddd")
-        usercmd = ['kill', 1234]
-        # Providing no signal should work
-        self.stubs.Set(os, 'readlink', fake_readlink)
-        self.assertTrue(f.match(usercmd))
-
-    def test_ReadFileFilter(self):
-        goodfn = '/good/file.name'
-        f = filters.ReadFileFilter(goodfn)
-        usercmd = ['cat', '/bad/file']
-        self.assertFalse(f.match(['cat', '/bad/file']))
-        usercmd = ['cat', goodfn]
-        self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn])
-        self.assertTrue(f.match(usercmd))
-
-    def test_IpFilter_non_netns(self):
-        f = filters.IpFilter('/sbin/ip', 'root')
-        self.assertTrue(f.match(['ip', 'link', 'list']))
-
-    def _test_IpFilter_netns_helper(self, action):
-        f = filters.IpFilter('/sbin/ip', 'root')
-        self.assertTrue(f.match(['ip', 'link', action]))
-
-    def test_IpFilter_netns_add(self):
-        self._test_IpFilter_netns_helper('add')
-
-    def test_IpFilter_netns_delete(self):
-        self._test_IpFilter_netns_helper('delete')
-
-    def test_IpFilter_netns_list(self):
-        self._test_IpFilter_netns_helper('list')
-
-    def test_IpNetnsExecFilter_match(self):
-        f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
-        self.assertTrue(
-            f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
-
-    def test_IpNetnsExecFilter_nomatch(self):
-        f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
-        self.assertFalse(f.match(['ip', 'link', 'list']))
-
-    def test_match_filter_recurses_exec_command_filter_matches(self):
-        filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
-                       filters.IpFilter('/sbin/ip', 'root')]
-        args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
-
-        self.assertIsNotNone(wrapper.match_filter(filter_list, args))
-
-    def test_match_filter_recurses_exec_command_filter_does_not_match(self):
-        filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
-                       filters.IpFilter('/sbin/ip', 'root')]
-        args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
-                'ip', 'link', 'list']
-
-        self.assertRaises(wrapper.NoFilterMatched,
-                          wrapper.match_filter, filter_list, args)
-
-    def test_exec_dirs_search(self):
-        # This test supposes you have /bin/cat or /usr/bin/cat locally
-        f = filters.CommandFilter("cat", "root")
-        usercmd = ['cat', '/f']
-        self.assertTrue(f.match(usercmd))
-        self.assertTrue(f.get_command(usercmd,
-                                      exec_dirs=['/bin', '/usr/bin'])
-                        in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))
-
-    def test_skips(self):
-        # Check that all filters are skipped and that the last matches
-        usercmd = ["cat", "/"]
-        filtermatch = wrapper.match_filter(self.filters, usercmd)
-        self.assertTrue(filtermatch is self.filters[-1])
-
-    def test_RootwrapConfig(self):
-        raw = ConfigParser.RawConfigParser()
-
-        # Empty config should raise ConfigParser.Error
-        self.assertRaises(ConfigParser.Error, wrapper.RootwrapConfig, raw)
-
-        # Check default values
-        raw.set('DEFAULT', 'filters_path', '/a,/b')
-        config = wrapper.RootwrapConfig(raw)
-        self.assertEqual(config.filters_path, ['/a', '/b'])
-        self.assertEqual(config.exec_dirs, os.environ["PATH"].split(':'))
-        self.assertFalse(config.use_syslog)
-        self.assertEqual(config.syslog_log_facility,
-                         logging.handlers.SysLogHandler.LOG_SYSLOG)
-        self.assertEqual(config.syslog_log_level, logging.ERROR)
-
-        # Check general values
-        raw.set('DEFAULT', 'exec_dirs', '/a,/x')
-        config = wrapper.RootwrapConfig(raw)
-        self.assertEqual(config.exec_dirs, ['/a', '/x'])
-
-        raw.set('DEFAULT', 'use_syslog', 'oui')
-        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
-        raw.set('DEFAULT', 'use_syslog', 'true')
-        config = wrapper.RootwrapConfig(raw)
-        self.assertTrue(config.use_syslog)
-
-        raw.set('DEFAULT', 'syslog_log_facility', 'moo')
-        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
-        raw.set('DEFAULT', 'syslog_log_facility', 'local0')
-        config = wrapper.RootwrapConfig(raw)
-        self.assertEqual(config.syslog_log_facility,
-                         logging.handlers.SysLogHandler.LOG_LOCAL0)
-        raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
-        config = wrapper.RootwrapConfig(raw)
-        self.assertEqual(config.syslog_log_facility,
-                         logging.handlers.SysLogHandler.LOG_AUTH)
-
-        raw.set('DEFAULT', 'syslog_log_level', 'bar')
-        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
-        raw.set('DEFAULT', 'syslog_log_level', 'INFO')
-        config = wrapper.RootwrapConfig(raw)
-        self.assertEqual(config.syslog_log_level, logging.INFO)
-
-
-class PathFilterTestCase(base.BaseTestCase):
-    def setUp(self):
-        super(PathFilterTestCase, self).setUp()
-
-        tmpdir = fixtures.TempDir('/tmp')
-        self.useFixture(tmpdir)
-
-        self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)
-
-        gen_name = lambda: str(uuid.uuid4())
-
-        self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
-        self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join('/tmp', 'some')
-        self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
-                                                 'some')
-        self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')
-
-        self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
-                                                         gen_name())
-        os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
-                   self.TRAVERSAL_SYMLINK_WITHIN_DIR)
-
-        self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
-                                                          gen_name())
-        os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
-                   self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)
-
-        self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
-        os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)
-
-        self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
-        os.symlink(os.path.join('/tmp', 'some_file'), self.SYMLINK_OUTSIDE_DIR)
-
-    def test_argument_pass_constraint(self):
-        f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')
-
-        args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
-        self.assertTrue(f.match(args))
-
-    def test_argument_equality_constraint(self):
-        f = filters.PathFilter('/bin/chown', 'root', 'nova', '/tmp/spam/eggs')
-
-        args = ['chown', 'nova', '/tmp/spam/eggs']
-        self.assertTrue(f.match(args))
-
-        args = ['chown', 'neutron', '/tmp/spam/eggs']
-        self.assertFalse(f.match(args))
-
-    def test_wrong_arguments_number(self):
-        args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
-        self.assertFalse(self.f.match(args))
-
-    def test_wrong_exec_command(self):
-        args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
-        self.assertFalse(self.f.match(args))
-
-    def test_match(self):
-        args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
-        self.assertTrue(self.f.match(args))
-
-    def test_match_traversal(self):
-        args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
-        self.assertTrue(self.f.match(args))
-
-    def test_match_symlink(self):
-        args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
-        self.assertTrue(self.f.match(args))
-
-    def test_match_traversal_symlink(self):
-        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
-        self.assertTrue(self.f.match(args))
-
-    def test_reject(self):
-        args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
-        self.assertFalse(self.f.match(args))
-
-    def test_reject_traversal(self):
-        args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
-        self.assertFalse(self.f.match(args))
-
-    def test_reject_symlink(self):
-        args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
-        self.assertFalse(self.f.match(args))
-
-    def test_reject_traversal_symlink(self):
-        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
-        self.assertFalse(self.f.match(args))
-
-    def test_get_command(self):
-        args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
-        expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
-
-        self.assertEqual(expected, self.f.get_command(args))
-
-    def test_get_command_traversal(self):
-        args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
-        expected = ['/bin/chown', 'nova',
-                    os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]
-
-        self.assertEqual(expected, self.f.get_command(args))
-
-    def test_get_command_symlink(self):
-        args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
-        expected = ['/bin/chown', 'nova',
-                    os.path.realpath(self.SYMLINK_WITHIN_DIR)]
-
-        self.assertEqual(expected, self.f.get_command(args))
-
-    def test_get_command_traversal_symlink(self):
-        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
-        expected = ['/bin/chown', 'nova',
-                    os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]
-
-        self.assertEqual(expected, self.f.get_command(args))