# dhcp-agent uses cat
cat: RegExpFilter, /bin/cat, root, cat, /proc/\d+/cmdline
+
+# ip_lib
+ip: IpFilter, /sbin/ip, root
+ip_usr: IpFilter, /usr/sbin/ip, root
+ip_exec: IpNetnsExecFilter, /sbin/ip, root
+ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
sysctl: CommandFilter, /sbin/sysctl, root
# ip_lib
-ip: CommandFilter, /sbin/ip, root
-ip_usr: CommandFilter, /usr/sbin/ip, root
+ip: IpFilter, /sbin/ip, root
+ip_usr: IpFilter, /usr/sbin/ip, root
+ip_exec: IpNetnsExecFilter, /sbin/ip, root
+ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
# ovs_lib (if OVSInterfaceDriver is used)
ovs-vsctl: CommandFilter, /bin/ovs-vsctl, root
# from the old mechanism
brctl: CommandFilter, /sbin/brctl, root
brctl_usr: CommandFilter, /usr/sbin/brctl, root
-ip: CommandFilter, /sbin/ip, root
-ip_usr: CommandFilter, /usr/sbin/ip, root
+
+# ip_lib
+ip: IpFilter, /sbin/ip, root
+ip_usr: IpFilter, /usr/sbin/ip, root
+ip_exec: IpNetnsExecFilter, /sbin/ip, root
+ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
return None
+class ExecCommandFilter(CommandFilter):
+ def exec_args(self, userargs):
+ return []
+
+
class RegExpFilter(CommandFilter):
"""Command filter doing regexp matching for every argument"""
if len(userargs) != 2:
return False
return True
+
+
+class IpFilter(CommandFilter):
+ """Specific filter for the ip utility to that does not match exec."""
+
+ def match(self, userargs):
+ if userargs[0] == 'ip':
+ if userargs[1] == 'netns':
+ if userargs[2] in ('list', 'add', 'delete'):
+ return True
+ else:
+ return False
+ else:
+ return True
+
+
+class IpNetnsExecFilter(ExecCommandFilter):
+ """Specific filter for the ip utility to that does match exec."""
+ def match(self, userargs):
+ if userargs[:3] == ['ip', 'netns', 'exec']:
+ return True
+ else:
+ return False
+
+ def exec_args(self, userargs):
+ return userargs[4:]
return filterlist
-def match_filter(filters, userargs):
+def match_filter(filter_list, userargs):
"""
Checks user command and arguments through command filters and
returns the first matching filter, or None is none matched.
found_filter = None
- for f in filters:
+ for f in filter_list:
if f.match(userargs):
+ if isinstance(f, filters.ExecCommandFilter):
+ # This command calls exec verify that remaining args
+ # matches another filter.
+ leaf_filters = [fltr for fltr in filter_list
+ if not isinstance(fltr,
+ filters.ExecCommandFilter)]
+ args = f.exec_args(userargs)
+ if not args or not match_filter(leaf_filters, args):
+ continue
+
# Try other filters if executable is absent
if not os.access(f.exec_path, os.X_OK):
if not found_filter:
import os
import subprocess
+import unittest2 as unittest
+
from quantum.rootwrap import filters
from quantum.rootwrap import wrapper
-import unittest
class RootwrapTestCase(unittest.TestCase):
self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn])
self.assertTrue(f.match(usercmd))
+ def test_IpFilter_non_netns(self):
+ f = filters.IpFilter('/sbin/ip', 'root')
+ self.assertTrue(f.match(['ip', 'link', 'list']))
+
+ def _test_IpFilter_netns_helper(self, action):
+ f = filters.IpFilter('/sbin/ip', 'root')
+ self.assertTrue(f.match(['ip', 'link', action]))
+
+ def test_IpFilter_netns_add(self):
+ self._test_IpFilter_netns_helper('add')
+
+ def test_IpFilter_netns_delete(self):
+ self._test_IpFilter_netns_helper('delete')
+
+ def test_IpFilter_netns_list(self):
+ self._test_IpFilter_netns_helper('list')
+
+ def test_IpNetnsExecFilter_match(self):
+ f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
+ self.assertTrue(
+ f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
+
+ def test_IpNetnsExecFilter_nomatch(self):
+ f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
+ self.assertFalse(f.match(['ip', 'link', 'list']))
+
+ def test_match_filter_recurses_exec_command_filter(self):
+ filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
+ filters.IpFilter('/sbin/ip', 'root')]
+ args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
+
+ self.assertIsNotNone(wrapper.match_filter(filter_list, args))
+
+ def test_match_filter_recurses_exec_command_filter(self):
+ filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
+ filters.IpFilter('/sbin/ip', 'root')]
+ args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
+ 'ip', 'link', 'list']
+
+ self.assertIsNone(wrapper.match_filter(filter_list, args))
+
def test_skips(self):
# Check that all filters are skipped and that the last matches
usercmd = ["cat", "/"]