- [N321] Validate that jsonutils module is used instead of json
- [N322] Detect common errors with assert_called_once_with
- [N323] Enforce namespace-less imports for oslo libraries
+- [N324] Prevent use of deprecated contextlib.nested.
Creating Unit Tests
-------------------
oslo_namespace_imports_dot = re.compile(r"import[\s]+oslo[.][^\s]+")
oslo_namespace_imports_from_dot = re.compile(r"from[\s]+oslo[.]")
oslo_namespace_imports_from_root = re.compile(r"from[\s]+oslo[\s]+import[\s]+")
+contextlib_nested = re.compile(r"^with (contextlib\.)?nested\(")
def validate_log_translations(logical_line, physical_line, filename):
yield(0, msg)
+def check_no_contextlib_nested(logical_line, filename):
+ msg = ("N324: contextlib.nested is deprecated. With Python 2.7 and later "
+ "the with-statement supports multiple nested objects. See https://"
+ "docs.python.org/2/library/contextlib.html#contextlib.nested for "
+ "more information.")
+
+ # TODO(ankit): The following check is temporary.
+ # A series of patches will be submitted to address
+ # these issues. It should be removed completely
+ # when bug 1428424 is closed.
+ ignore_dirs = [
+ "neutron/plugins/ml2",
+ "neutron/tests/unit/agent/test_securitygroups_rpc.py",
+ "neutron/tests/unit/api",
+ "neutron/tests/unit/db",
+ "neutron/tests/unit/extensions",
+ "neutron/tests/unit/plugins",
+ "neutron/tests/unit/scheduler"]
+ for directory in ignore_dirs:
+ if directory in filename:
+ return
+
+ if contextlib_nested.match(logical_line):
+ yield(0, msg)
+
+
def factory(register):
register(validate_log_translations)
register(use_jsonutils)
register(check_assert_called_once_with)
register(no_translate_debug_logs)
register(check_oslo_namespace_imports)
+ register(check_no_contextlib_nested)
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
import httplib
from oslo_config import cfg
def test_ports_vif_details(self):
plugin = manager.NeutronManager.get_plugin()
cfg.CONF.set_default('allow_overlapping_ips', True)
- with contextlib.nested(self.port(), self.port()):
+ with self.port(), self.port():
ctx = context.get_admin_context()
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
def test_ports_vif_host(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
- with contextlib.nested(
- self.port(name='name1',
- arg_list=(portbindings.HOST_ID,),
- **host_arg),
- self.port(name='name2')):
+ with self.port(name='name1',
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg), self.port(name='name2'):
ctx = context.get_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
def test_ports_vif_host_update(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
- with contextlib.nested(
- self.port(name='name1',
- arg_list=(portbindings.HOST_ID,),
- **host_arg),
- self.port(name='name2')) as (port1, port2):
+ with self.port(name='name1', arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1, self.port(name='name2') as port2:
data = {'port': {portbindings.HOST_ID: 'testhosttemp'}}
req = self.new_update_request('ports', data, port1['port']['id'])
req.get_response(self.api)
def test_ports_vif_host_list(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
- with contextlib.nested(
- self.port(name='name1',
- arg_list=(portbindings.HOST_ID,),
- **host_arg),
- self.port(name='name2'),
- self.port(name='name3',
- arg_list=(portbindings.HOST_ID,),
- **host_arg),) as (port1, _port2, port3):
+ with self.port(name='name1',
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1,\
+ self.port(name='name2'),\
+ self.port(name='name3',
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port3:
self._test_list_resources(
'port', (port1, port3),
query_params='%s=%s' % (portbindings.HOST_ID, self.hostname))
def test_ports_vnic_type(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
- with contextlib.nested(
- self.port(name='name1',
- arg_list=(portbindings.VNIC_TYPE,),
- **vnic_arg),
- self.port(name='name2')):
+ with self.port(name='name1', arg_list=(portbindings.VNIC_TYPE,),
+ **vnic_arg), self.port(name='name2'):
ctx = context.get_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
def test_ports_vnic_type_list(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
- with contextlib.nested(
- self.port(name='name1',
- arg_list=(portbindings.VNIC_TYPE,),
- **vnic_arg),
- self.port(name='name2'),
- self.port(name='name3',
- arg_list=(portbindings.VNIC_TYPE,),
- **vnic_arg),) as (port1, port2, port3):
+ with self.port(name='name1',
+ arg_list=(portbindings.VNIC_TYPE,),
+ **vnic_arg) as port1,\
+ self.port(name='name2') as port2,\
+ self.port(name='name3',
+ arg_list=(portbindings.VNIC_TYPE,),
+ **vnic_arg) as port3:
self._test_list_resources(
'port', (port1, port2, port3),
query_params='%s=%s' % (portbindings.VNIC_TYPE,
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
import copy
import sys
import uuid
proxy = dhcp_agent.DhcpPluginApi('foo', ctxt, None)
proxy.host = 'foo'
- with contextlib.nested(
- mock.patch.object(proxy.client, 'call'),
- mock.patch.object(proxy.client, 'prepare'),
- ) as (
- rpc_mock, prepare_mock
- ):
+ with mock.patch.object(proxy.client, 'call') as rpc_mock,\
+ mock.patch.object(proxy.client, 'prepare') as prepare_mock:
prepare_mock.return_value = proxy.client
rpc_mock.return_value = kwargs.pop('return_value', [])
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
import copy
import eventlet
ri.fip_ns.subscribe = mock.Mock()
ri.fip_ns.agent_router_gateway = mock.Mock()
- with contextlib.nested(mock.patch.object(ri,
- 'get_floating_ips'),
- mock.patch.object(
- ri, 'get_floating_agent_gw_interface')
- ) as (fips,
- fip_gw_port):
+ with mock.patch.object(ri, 'get_floating_ips') as fips,\
+ mock.patch.object(ri, 'get_floating_agent_gw_interface'
+ ) as fip_gw_port:
fips.return_value = fake_floatingips
fip_gw_port.return_value = agent_gateway_port[0]
ri.create_dvr_fip_interfaces(ext_gw_port)
ri.rtr_fip_subnet = None
ri.dist_fip_count = 1
- with contextlib.nested(mock.patch.object(
- ri, 'get_floating_ips'),
- mock.patch.object(
- ri, 'get_floating_agent_gw_interface')
- ) as (fips,
- fip_gw_port):
+ with mock.patch.object(ri, 'get_floating_ips') as fips,\
+ mock.patch.object(ri, 'get_floating_agent_gw_interface'
+ ) as fip_gw_port:
fips.return_value = fake_floatingips
fip_gw_port.return_value = agent_gateway_port[0]
ri.create_dvr_fip_interfaces(ext_gw_port)
self.assertEqual(len(internal_ports), 1)
internal_port = internal_ports[0]
- with contextlib.nested(mock.patch.object(ri,
- 'internal_network_removed'),
- mock.patch.object(ri,
- 'internal_network_added'),
- mock.patch.object(ri,
- 'external_gateway_removed'),
- mock.patch.object(ri,
- 'external_gateway_added')
- ) as (internal_network_removed,
- internal_network_added,
- external_gateway_removed,
- external_gateway_added):
+ with mock.patch.object(ri, 'internal_network_removed'
+ ) as internal_network_removed,\
+ mock.patch.object(ri, 'internal_network_added'
+ ) as internal_network_added,\
+ mock.patch.object(ri, 'external_gateway_removed'
+ ) as external_gateway_removed,\
+ mock.patch.object(ri, 'external_gateway_added'
+ ) as external_gateway_added:
ri.process(agent)
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
-
import eventlet.event
import eventlet.queue
import eventlet.timeout
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
def _test__kill(self, respawning, pid=None):
- with contextlib.nested(
- mock.patch.object(self.proc, '_kill_event'),
+ with mock.patch.object(self.proc, '_kill_event'
+ ) as mock_kill_event,\
mock.patch.object(utils, 'get_root_helper_child_pid',
- return_value=pid),
- mock.patch.object(self.proc, '_kill_process'),
- mock.patch.object(self.proc, '_process')) as (
- mock_kill_event,
- mock_get_child_pid,
- mock_kill_process,
- mock_process):
+ return_value=pid),\
+ mock.patch.object(self.proc, '_kill_process'
+ ) as mock_kill_process,\
+ mock.patch.object(self.proc, '_process'):
self.proc._kill(respawning)
if respawning:
# under the License.
-import contextlib
import os
import sys
daemon.setgid, '321')
log_critical.assert_once_with(mock.ANY)
- def test_drop_no_privileges(self):
- with contextlib.nested(
- mock.patch.object(os, 'setgroups'),
- mock.patch.object(daemon, 'setgid'),
- mock.patch.object(daemon, 'setuid')) as mocks:
- daemon.drop_privileges()
- for cursor in mocks:
- self.assertFalse(cursor.called)
-
- def _test_drop_privileges(self, user=None, group=None):
- with contextlib.nested(
- mock.patch.object(os, 'geteuid', return_value=0),
- mock.patch.object(os, 'setgroups'),
- mock.patch.object(daemon, 'setgid'),
- mock.patch.object(daemon, 'setuid')) as (
- geteuid, setgroups, setgid, setuid):
- daemon.drop_privileges(user=user, group=group)
- if user:
- setuid.assert_called_once_with(user)
- else:
- self.assertFalse(setuid.called)
- if group:
- setgroups.assert_called_once_with([])
- setgid.assert_called_once_with(group)
- else:
- self.assertFalse(setgroups.called)
- self.assertFalse(setgid.called)
+ @mock.patch.object(os, 'setgroups')
+ @mock.patch.object(daemon, 'setgid')
+ @mock.patch.object(daemon, 'setuid')
+ def test_drop_no_privileges(self, mock_setuid, mock_setgid,
+ mock_setgroups):
+ daemon.drop_privileges()
+ for cursor in (mock_setuid, mock_setgid, mock_setgroups):
+ self.assertFalse(cursor.called)
+
+ @mock.patch.object(os, 'geteuid', return_value=0)
+ @mock.patch.object(os, 'setgroups')
+ @mock.patch.object(daemon, 'setgid')
+ @mock.patch.object(daemon, 'setuid')
+ def _test_drop_privileges(self, setuid, setgid, setgroups,
+ geteuid, user=None, group=None):
+ daemon.drop_privileges(user=user, group=group)
+ if user:
+ setuid.assert_called_once_with(user)
+ else:
+ self.assertFalse(setuid.called)
+ if group:
+ setgroups.assert_called_once_with([])
+ setgid.assert_called_once_with(group)
+ else:
+ self.assertFalse(setgroups.called)
+ self.assertFalse(setgid.called)
def test_drop_user_privileges(self):
self._test_drop_privileges(user='user')
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
-
import mock
import testtools
import webob
remote_address = 'remote-address'
expected = ['port1']
networks = (network_id,)
- with contextlib.nested(
- mock.patch.object(self.handler, '_get_ports_for_remote_address'),
- mock.patch.object(self.handler, '_get_router_networks')
- ) as (mock_get_ip_addr, mock_get_router_networks):
+ with mock.patch.object(self.handler,
+ '_get_ports_for_remote_address'
+ ) as mock_get_ip_addr,\
+ mock.patch.object(self.handler,
+ '_get_router_networks'
+ ) as mock_get_router_networks:
mock_get_ip_addr.return_value = expected
ports = self.handler._get_ports(remote_address, network_id,
router_id)
remote_address = 'remote-address'
expected = ['port1']
networks = ('network1', 'network2')
- with contextlib.nested(
- mock.patch.object(self.handler,
- '_get_ports_for_remote_address',
- return_value=expected),
- mock.patch.object(self.handler,
- '_get_router_networks',
- return_value=networks)
- ) as (mock_get_ip_addr, mock_get_router_networks):
+ with mock.patch.object(self.handler,
+ '_get_ports_for_remote_address',
+ return_value=expected
+ ) as mock_get_ip_addr,\
+ mock.patch.object(self.handler,
+ '_get_router_networks',
+ return_value=networks
+ ) as mock_get_router_networks:
ports = self.handler._get_ports(remote_address,
router_id=router_id)
mock_get_router_networks.called_once_with(router_id)
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
-
import mock
from oslo_config import cfg
cfg.CONF.set_override('debug', True)
agent = l3_agent.L3NATAgent('localhost')
- with contextlib.nested(
- mock.patch('os.geteuid', return_value=self.EUID),
- mock.patch('os.getegid', return_value=self.EGID),
+ with mock.patch('os.geteuid', return_value=self.EUID),\
+ mock.patch('os.getegid', return_value=self.EGID),\
mock.patch(is_effective_user,
- side_effect=fake_is_effective_user),
- mock.patch(ip_class_path)) as (
- geteuid, getegid, is_effective_user, ip_mock):
+ side_effect=fake_is_effective_user),\
+ mock.patch(ip_class_path) as ip_mock:
agent.metadata_driver.spawn_monitored_metadata_proxy(
agent.process_monitor,
router_ns,
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
-
import mock
from neutron.common import constants as n_const
self.assertEqual(expected, results)
def test_fdb_add_tun(self):
- with contextlib.nested(
- mock.patch.object(self.fakeagent, 'setup_tunnel_port'),
- mock.patch.object(self.fakeagent, 'add_fdb_flow'),
- ) as (mock_setup_tunnel_port, mock_add_fdb_flow):
+ with mock.patch.object(self.fakeagent, 'setup_tunnel_port'),\
+ mock.patch.object(self.fakeagent, 'add_fdb_flow'
+ ) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.agent_ports,
self._tunnel_port_lookup)
def test_fdb_add_tun_non_existence_key_in_ofports(self):
ofport = self.lvm1.network_type + '0a0a0a0a'
del self.ofports[self.type_gre][self.ports[1].ip]
- with contextlib.nested(
- mock.patch.object(self.fakeagent, 'setup_tunnel_port',
- return_value=ofport),
- mock.patch.object(self.fakeagent, 'add_fdb_flow'),
- ) as (mock_setup_tunnel_port, mock_add_fdb_flow):
+ with mock.patch.object(self.fakeagent, 'setup_tunnel_port',
+ return_value=ofport
+ ) as mock_setup_tunnel_port,\
+ mock.patch.object(self.fakeagent, 'add_fdb_flow'
+ ) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.agent_ports,
self._tunnel_port_lookup)
def test_fdb_add_tun_unavailable_ofport(self):
del self.ofports[self.type_gre][self.ports[1].ip]
- with contextlib.nested(
- mock.patch.object(self.fakeagent, 'setup_tunnel_port',
- return_value=0),
- mock.patch.object(self.fakeagent, 'add_fdb_flow'),
- ) as (mock_setup_tunnel_port, mock_add_fdb_flow):
+ with mock.patch.object(self.fakeagent, 'setup_tunnel_port',
+ return_value=0
+ ) as mock_setup_tunnel_port,\
+ mock.patch.object(self.fakeagent, 'add_fdb_flow'
+ ) as mock_add_fdb_flow:
self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
self.agent_ports,
self._tunnel_port_lookup)
def test_fdb_remove_tun_flooding_entry(self):
self.agent_ports[self.ports[1].ip] = [n_const.FLOODING_ENTRY]
- with contextlib.nested(
- mock.patch.object(self.fakeagent, 'del_fdb_flow'),
- mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'),
- ) as (mock_del_fdb_flow, mock_cleanup_tunnel_port):
+ with mock.patch.object(self.fakeagent, 'del_fdb_flow'
+ ) as mock_del_fdb_flow,\
+ mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'
+ ) as mock_cleanup_tunnel_port:
self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
self.agent_ports,
self._tunnel_port_lookup)
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
-
import mock
from oslo_context import context as oslo_context
import oslo_messaging
agent = rpc.PluginApi('fake_topic')
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
expect_val = 'foo'
- with contextlib.nested(
- mock.patch.object(agent.client, 'call'),
- mock.patch.object(agent.client, 'prepare'),
- ) as (
- mock_call, mock_prepare
- ):
+ with mock.patch.object(agent.client, 'call') as mock_call,\
+ mock.patch.object(agent.client, 'prepare') as mock_prepare:
mock_prepare.return_value = agent.client
mock_call.return_value = expect_val
func_obj = getattr(agent, method)
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
expect_val_get_device_details = 'foo'
expect_val = [expect_val_get_device_details]
- with contextlib.nested(
- mock.patch.object(agent.client, 'call'),
- mock.patch.object(agent.client, 'prepare'),
- ) as (
- mock_call, mock_prepare
- ):
+ with mock.patch.object(agent.client, 'call') as mock_call, \
+ mock.patch.object(agent.client, 'prepare') as mock_prepare:
mock_prepare.return_value = agent.client
mock_call.side_effect = [oslo_messaging.UnsupportedVersion('1.2'),
expect_val_get_device_details]
topic = 'test'
reportStateAPI = rpc.PluginReportStateAPI(topic)
expected_agent_state = {'agent': 'test'}
- with contextlib.nested(
- mock.patch.object(reportStateAPI.client, 'call'),
- mock.patch.object(reportStateAPI.client, 'cast'),
- mock.patch.object(reportStateAPI.client, 'prepare'),
- ) as (
- mock_call, mock_cast, mock_prepare
- ):
+ with mock.patch.object(reportStateAPI.client, 'call') as mock_call, \
+ mock.patch.object(reportStateAPI.client, 'cast'), \
+ mock.patch.object(reportStateAPI.client, 'prepare'
+ ) as mock_prepare:
mock_prepare.return_value = reportStateAPI.client
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
reportStateAPI.report_state(ctxt, expected_agent_state,
topic = 'test'
reportStateAPI = rpc.PluginReportStateAPI(topic)
expected_agent_state = {'agent': 'test'}
- with contextlib.nested(
- mock.patch.object(reportStateAPI.client, 'call'),
- mock.patch.object(reportStateAPI.client, 'cast'),
- mock.patch.object(reportStateAPI.client, 'prepare'),
- ) as (
- mock_call, mock_cast, mock_prepare
- ):
+ with mock.patch.object(reportStateAPI.client, 'call'), \
+ mock.patch.object(reportStateAPI.client, 'cast'
+ ) as mock_cast, \
+ mock.patch.object(reportStateAPI.client, 'prepare'
+ ) as mock_prepare:
mock_prepare.return_value = reportStateAPI.client
ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
reportStateAPI.report_state(ctxt, expected_agent_state)
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
import itertools
import mock
class TestOVSCleanup(base.BaseTestCase):
- def test_main(self):
+ @mock.patch('neutron.common.config.setup_logging')
+ @mock.patch('neutron.cmd.ovs_cleanup.setup_conf')
+ @mock.patch('neutron.agent.common.ovs_lib.BaseOVS.get_bridges')
+ @mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
+ @mock.patch.object(util, 'collect_neutron_ports')
+ @mock.patch.object(util, 'delete_neutron_ports')
+ def test_main(self, mock_delete, mock_collect, mock_ovs,
+ mock_get_bridges, mock_conf, mock_logging):
bridges = ['br-int', 'br-ex']
ports = ['p1', 'p2', 'p3']
conf = mock.Mock()
conf.ovs_all_ports = False
conf.ovs_integration_bridge = 'br-int'
conf.external_network_bridge = 'br-ex'
- with contextlib.nested(
- mock.patch('neutron.common.config.setup_logging'),
- mock.patch('neutron.cmd.ovs_cleanup.setup_conf',
- return_value=conf),
- mock.patch('neutron.agent.common.ovs_lib.BaseOVS.get_bridges',
- return_value=bridges),
- mock.patch('neutron.agent.common.ovs_lib.OVSBridge'),
- mock.patch.object(util, 'collect_neutron_ports',
- return_value=ports),
- mock.patch.object(util, 'delete_neutron_ports')
- ) as (_log, _conf, _get, ovs, collect, delete):
- with mock.patch('neutron.common.config.setup_logging'):
- util.main()
- ovs.assert_has_calls([mock.call().delete_ports(
- all_ports=False)])
- collect.assert_called_once_with(set(bridges))
- delete.assert_called_once_with(ports)
+ mock_conf.return_value = conf
+ mock_get_bridges.return_value = bridges
+ mock_collect.return_value = ports
+
+ util.main()
+ mock_ovs.assert_has_calls([mock.call().delete_ports(
+ all_ports=False)])
+ mock_collect.assert_called_once_with(set(bridges))
+ mock_delete.assert_called_once_with(ports)
def test_collect_neutron_ports(self):
port1 = ovs_lib.VifPort('tap1234', 1, uuidutils.generate_uuid(),
ret = util.collect_neutron_ports(bridges)
self.assertEqual(ret, portnames)
- def test_delete_neutron_ports(self):
+ @mock.patch.object(ip_lib, 'IPDevice')
+ def test_delete_neutron_ports(self, mock_ip):
ports = ['tap1234', 'tap5678', 'tap09ab']
port_found = [True, False, True]
- with contextlib.nested(
- mock.patch.object(ip_lib, 'device_exists',
- side_effect=port_found),
- mock.patch.object(ip_lib, 'IPDevice')
- ) as (device_exists, ip_dev):
+
+ with mock.patch.object(
+ ip_lib, 'device_exists',
+ side_effect=port_found) as device_exists:
util.delete_neutron_ports(ports)
device_exists.assert_has_calls([mock.call(p) for p in ports])
- ip_dev.assert_has_calls(
+ mock_ip.assert_has_calls(
[mock.call('tap1234'),
- mock.call().link.delete(),
- mock.call('tap09ab'),
- mock.call().link.delete()])
+ mock.call().link.delete(),
+ mock.call('tap09ab'),
+ mock.call().link.delete()])
"""Test of Policy Engine For Neutron"""
-import contextlib
import StringIO
import urllib2
'create_fake_resource:fake_resources',
'create_fake_resource:attr:sub_attr_1'], rules)
- def test_log_rule_list(self):
- with contextlib.nested(
- mock.patch.object(policy.LOG, 'isEnabledFor', return_value=True),
- mock.patch.object(policy.LOG, 'debug')
- ) as (is_e, dbg):
- policy.log_rule_list(common_policy.RuleCheck('rule', 'create_'))
- self.assertTrue(is_e.called)
- self.assertTrue(dbg.called)
+ @mock.patch.object(policy.LOG, 'isEnabledFor', return_value=True)
+ @mock.patch.object(policy.LOG, 'debug')
+ def test_log_rule_list(self, mock_debug, mock_is_e):
+ policy.log_rule_list(common_policy.RuleCheck('rule', 'create_'))
+ self.assertTrue(mock_is_e.called)
+ self.assertTrue(mock_debug.called)