]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
mock.assert_called_once() is not a valid method
authorJacek Swiderski <jacek.swiderski@codilime.com>
Tue, 23 Sep 2014 12:35:06 +0000 (14:35 +0200)
committerJacek Swiderski <jacek.swiderski@codilime.com>
Mon, 13 Oct 2014 10:36:49 +0000 (12:36 +0200)
mock.assert_called_once() is a no-op that tests nothing. Instead
mock.assert_called_once_with() should be used (or use
assertEqual(1, mock_obj.call_count) if you don't want to check
parameters).

Borrowed HACKING rule from Davanum Srinivas's nova patch to
prevent it from appearing again.

Change-Id: Idac1d3c89c07e13c9a209663f4e557fcb7547821
Closes-Bug: #1365751
Closes-Bug: #1300265

HACKING.rst
neutron/hacking/checks.py
neutron/tests/unit/ml2/drivers/cisco/apic/test_cisco_apic_mechanism_driver.py
neutron/tests/unit/ml2/drivers/cisco/apic/test_cisco_apic_sync.py
neutron/tests/unit/services/l3_router/test_l3_apic_plugin.py
neutron/tests/unit/services/loadbalancer/drivers/radware/test_plugin_driver.py
neutron/tests/unit/test_hacking.py
neutron/tests/unit/test_l3_agent.py

index a0595952cae4968abbaa8ba08f6f60b948ec9906..b12291a1c021b1852c89b030af773b1e880fa8d1 100644 (file)
@@ -12,6 +12,7 @@ Neutron Specific Commandments
 - [N321] Validate that jsonutils module is used instead of json
 - [N322] We do not use @authors tags in source files. We have git to track
   authorship.
+- [N323] assert_called_once() is not a valid method
 
 Creating Unit Tests
 -------------------
index a170a0d4be8286eefbac615d1a03c28d1b7870cb..ebb99f0d00fa42b00d1c65df81de8e01733f5385 100644 (file)
@@ -79,7 +79,19 @@ def no_author_tags(physical_line):
             return pos, "N322: Don't use author tags"
 
 
+def check_assert_called_once(logical_line, filename):
+    msg = ("N323: assert_called_once is a no-op. please use "
+           "assert_called_once_with to test with explicit parameters or an "
+           "assertEqual with call_count.")
+
+    if 'neutron/tests/' in filename:
+        pos = logical_line.find('.assert_called_once(')
+        if pos != -1:
+            yield (pos, msg)
+
+
 def factory(register):
     register(validate_log_translations)
     register(use_jsonutils)
     register(no_author_tags)
+    register(check_assert_called_once)
index 7c41361bf3459a97336cf1c8302cdcb3e56ba925..e2f50716ce8d64c7f584eb98455f3f741ceb2a80 100644 (file)
@@ -68,10 +68,11 @@ class TestCiscoApicMechDriver(base.BaseTestCase,
         self.driver.apic_manager.apic.transaction = self.fake_transaction
 
     def test_initialize(self):
-        mgr = self.driver.apic_manager
         self.driver.initialize()
-        mgr.ensure_infra_created_on_apic.assert_called_once()
-        mgr.ensure_bgp_pod_policy_created_on_apic.assert_called_once()
+        mgr = self.driver.apic_manager
+        self.assertEqual(1, mgr.ensure_infra_created_on_apic.call_count)
+        self.assertEqual(1,
+                         mgr.ensure_bgp_pod_policy_created_on_apic.call_count)
 
     def test_update_port_postcommit(self):
         net_ctx = self._get_network_context(mocked.APIC_TENANT,
@@ -99,7 +100,7 @@ class TestCiscoApicMechDriver(base.BaseTestCase,
         self.driver.update_port_postcommit(port_ctx)
         mgr.get_router_contract.assert_called_once_with(
             port_ctx.current['device_id'])
-        mgr.ensure_context_enforced.assert_called_once()
+        self.assertEqual(1, mgr.ensure_context_enforced.call_count)
         mgr.ensure_external_routed_network_created.assert_called_once_with(
             mocked.APIC_NETWORK, transaction='transaction')
         mgr.ensure_logical_node_profile_created.assert_called_once_with(
@@ -119,17 +120,6 @@ class TestCiscoApicMechDriver(base.BaseTestCase,
             mocked.APIC_NETWORK, mgr.get_router_contract.return_value,
             transaction='transaction')
 
-    def test_update_gw_port_postcommit_fail_contract_create(self):
-        net_ctx = self._get_network_context(mocked.APIC_TENANT,
-                                            mocked.APIC_NETWORK,
-                                            TEST_SEGMENT1, external=True)
-        port_ctx = self._get_port_context(mocked.APIC_TENANT,
-                                          mocked.APIC_NETWORK,
-                                          'vm1', net_ctx, HOST_ID1, gw=True)
-        mgr = self.driver.apic_manager
-        self.driver.update_port_postcommit(port_ctx)
-        mgr.ensure_external_routed_network_deleted.assert_called_once()
-
     def test_create_network_postcommit(self):
         ctx = self._get_network_context(mocked.APIC_TENANT,
                                         mocked.APIC_NETWORK,
index 3780e57de21328ff4066d58b98266e2800dd770b..b687f47888399d2b8f1cda233d48772687a86c1b 100644 (file)
@@ -62,10 +62,10 @@ class TestCiscoApicSync(base.BaseTestCase):
         sync.core_plugin.get_ports.return_value = [{'id': 'port',
                                                     'network_id': 'net'}]
         sync.sync_base()
-        self.driver.create_network_postcommit.assert_called_once()
-        self.driver.create_subnet_postcommit.assert_called_once()
-        self.get_locked_port_and_binding.assert_called_once()
-        self.driver.create_port_postcommit.assert_called_once()
+        self.assertEqual(1, self.driver.create_network_postcommit.call_count)
+        self.assertEqual(1, self.driver.create_subnet_postcommit.call_count)
+        self.assertEqual(1, self.get_locked_port_and_binding.call_count)
+        self.assertEqual(1, self.driver.create_port_postcommit.call_count)
 
     def test_sync_router(self):
         sync = apic_sync.ApicRouterSynchronizer(self.driver)
@@ -74,4 +74,5 @@ class TestCiscoApicSync(base.BaseTestCase):
                                                     'network_id': 'net',
                                                     'device_id': 'dev'}]
         sync.sync_router()
-        self.driver.add_router_interface_postcommit.assert_called_once()
+        self.assertEqual(
+            1, self.driver.add_router_interface_postcommit.call_count)
index 79fb88d4e9590bd3935c2d17bbecd84ddbdd7f9e..b2ead4e693613605a42d71603e7e06833d9b19d4 100644 (file)
@@ -120,7 +120,8 @@ class TestCiscoApicL3Plugin(testlib_api.SqlTestCase,
         mgr = self.plugin.manager
         self.plugin.remove_router_interface(self.context, ROUTER,
                                             interface_info)
-        mgr.delete_contract_for_epg.assert_called_once()
+        mgr.remove_router_interface.assert_called_once_with(
+            mocked.APIC_TENANT, mocked.APIC_ROUTER, mocked.APIC_NETWORK)
 
     def test_add_router_interface_subnet(self):
         self._test_add_router_interface(self.interface_info['subnet'])
index 31f6cb2f1831e065f0ccaeeea7b290bf5ed9b5a4..ba5c968bd896947c5d91a9c2fdbff682ccb4303e 100644 (file)
@@ -19,6 +19,7 @@ import mock
 from oslo.config import cfg
 from six.moves import queue as Queue
 
+from neutron.api.v2 import attributes
 from neutron import context
 from neutron.extensions import loadbalancer
 from neutron import manager
@@ -155,6 +156,16 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
         """Call _get_pip twice and verify that a Port is created once."""
         port_dict = {'fixed_ips': [{'subnet_id': '10.10.10.10',
                                     'ip_address': '11.11.11.11'}]}
+        port_data = {
+            'tenant_id': 'tenant_id',
+            'name': 'port_name',
+            'network_id': 'network_id',
+            'mac_address': attributes.ATTR_NOT_SPECIFIED,
+            'admin_state_up': False,
+            'device_id': '',
+            'device_owner': 'neutron:' + constants.LOADBALANCER,
+            'fixed_ips': [{'subnet_id': '10.10.10.10'}]
+        }
         self.plugin_instance._core_plugin.get_ports = mock.Mock(
             return_value=[])
         self.plugin_instance._core_plugin.create_port = mock.Mock(
@@ -163,15 +174,18 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
         radware_driver._get_pip(context.get_admin_context(),
                                 'tenant_id', 'port_name',
                                 'network_id', '10.10.10.10')
-        self.plugin_instance._core_plugin.get_ports.assert_called_once()
-        self.plugin_instance._core_plugin.create_port.assert_called_once()
+        self.plugin_instance._core_plugin.get_ports.assert_called_once_with(
+                mock.ANY, filters={'name': ['port_name']})
+        self.plugin_instance._core_plugin.create_port.assert_called_once_with(
+                mock.ANY, {'port': port_data})
         self.plugin_instance._core_plugin.create_port.reset_mock()
         self.plugin_instance._core_plugin.get_ports.reset_mock()
         self.plugin_instance._core_plugin.get_ports.return_value = [port_dict]
         radware_driver._get_pip(context.get_admin_context(),
                                 'tenant_id', 'port_name',
                                 'network_id', '10.10.10.10')
-        self.plugin_instance._core_plugin.get_ports.assert_called_once()
+        self.plugin_instance._core_plugin.get_ports.assert_called_once_with(
+                mock.ANY, filters={'name': ['port_name']})
         self.assertFalse(self.plugin_instance._core_plugin.create_port.called)
 
     def test_rest_client_recover_was_called(self):
@@ -181,7 +195,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
         radware_driver.rest_client._call = self.orig__call
         self.assertRaises(r_exc.RESTRequestFailure,
                           radware_driver._verify_workflow_templates)
-        self.recover_mock.assert_called_once()
+        self.recover_mock.assert_called_once_with('GET',
+                                                  '/api/workflowTemplate',
+                                                  None, None, False)
 
     def test_rest_client_flip_servers(self):
         radware_driver = self.plugin_instance.drivers['radware']
index 895dbe9101931941c944690bdc8c8e683f2527d0..00ffd7de7084db7cf2a0f110a6871b0e990d0847 100644 (file)
@@ -79,3 +79,21 @@ class HackingTestCase(base.BaseTestCase):
         self.assertEqual(2, checks.no_author_tags("# author: pele")[0])
         self.assertEqual(2, checks.no_author_tags("# Author: pele")[0])
         self.assertEqual(3, checks.no_author_tags(".. moduleauthor:: pele")[0])
+
+    def test_assert_called_once(self):
+        fail_code = """
+               mock = Mock()
+               mock.method(1, 2, 3, test='wow')
+               mock.method.assert_called_once()
+               """
+        pass_code = """
+               mock = Mock()
+               mock.method(1, 2, 3, test='wow')
+               mock.method.assert_called_once_with()
+               """
+        self.assertEqual(
+            1, len(list(checks.check_assert_called_once(fail_code,
+                                            "neutron/tests/test_assert.py"))))
+        self.assertEqual(
+            0, len(list(checks.check_assert_called_once(pass_code,
+                                            "neutron/tests/test_assert.py"))))
index 264804a596e1d2d637358ac52ba1f7e46bbee92f..47de4c35f7e419eeb18540923a2ebade5a77292b 100644 (file)
@@ -1745,25 +1745,25 @@ vrrp_instance VR_1 {
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         agent._queue = mock.Mock()
         agent.router_deleted(None, FAKE_ID)
-        agent._queue.add.assert_called_once()
+        self.assertEqual(1, agent._queue.add.call_count)
 
     def test_routers_updated(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         agent._queue = mock.Mock()
         agent.routers_updated(None, [FAKE_ID])
-        agent._queue.add.assert_called_once()
+        self.assertEqual(1, agent._queue.add.call_count)
 
     def test_removed_from_agent(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         agent._queue = mock.Mock()
         agent.router_removed_from_agent(None, {'router_id': FAKE_ID})
-        agent._queue.add.assert_called_once()
+        self.assertEqual(1, agent._queue.add.call_count)
 
     def test_added_to_agent(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         agent._queue = mock.Mock()
         agent.router_added_to_agent(None, [FAKE_ID])
-        agent._queue.add.assert_called_once()
+        self.assertEqual(1, agent._queue.add.call_count)
 
     def test_destroy_fip_namespace(self):
         class FakeDev(object):