]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Avoid testing code duplication which introduced testing bugs
authorMiguel Angel Ajo <mangelajo@redhat.com>
Fri, 5 Sep 2014 12:10:35 +0000 (14:10 +0200)
committerMiguel Angel Ajo <mangelajo@redhat.com>
Fri, 5 Sep 2014 14:38:17 +0000 (16:38 +0200)
SecurityGroupAgentEnhancedRpcTestCase duplicated code in
SecurityGroupAgentRpcTestCase setUp, also
TestSecurityGroupAgentEnhancedRpcWithIptables duplicated
code from TestSecurityGroupAgentWithIptables setUp()
introducing bugs by improper initialization, like a missing
    self.iptables.use_ipv6 = True
which in combination with tests.unit.test_ipv6.TestIsEnabled
produced inconsistent testing results.

Change-Id: Ic0b596b6fe1d8273df197e9426abad818832c00a
Closes-bug: #1365829

neutron/tests/unit/test_security_groups_rpc.py

index c841bf322e828ff174c9d725d3d4fa740153904c..f5d4be886d635b135ebbb3dd063bac26fa0c45f1 100644 (file)
@@ -943,21 +943,15 @@ class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
                          'NoopFirewallDriver')
 
 
-class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
+class BaseSecurityGroupAgentRpcTestCase(base.BaseTestCase):
     def setUp(self, defer_refresh_firewall=False):
-        super(SecurityGroupAgentRpcTestCase, self).setUp()
-        cfg.CONF.set_default('firewall_driver',
-                             'neutron.agent.firewall.NoopFirewallDriver',
-                             group='SECURITYGROUP')
+        super(BaseSecurityGroupAgentRpcTestCase, self).setUp()
+        set_firewall_driver(FIREWALL_NOOP_DRIVER)
         self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
         self.agent.context = None
         mock.patch('neutron.agent.linux.iptables_manager').start()
         self.agent.root_helper = 'sudo'
-        rpc = mock.Mock()
-        rpc.security_group_info_for_devices.side_effect = (
-            messaging.UnsupportedVersion('1.2'))
-        self.agent.plugin_rpc = rpc
-
+        self.agent.plugin_rpc = mock.Mock()
         self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
         self.firewall = mock.Mock()
         firewall_object = firewall_base.FirewallDriver()
@@ -970,9 +964,18 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
                                                       'fake_sgid1',
                                                       'remote_group_id':
                                                       'fake_sgid2'}]}
-        fake_devices = {'fake_device': self.fake_device}
-        self.firewall.ports = fake_devices
-        rpc.security_group_rules_for_devices.return_value = fake_devices
+        self.firewall.ports = {'fake_device': self.fake_device}
+
+
+class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
+    def setUp(self, defer_refresh_firewall=False):
+        super(SecurityGroupAgentRpcTestCase, self).setUp(
+            defer_refresh_firewall)
+        rpc = self.agent.plugin_rpc
+        rpc.security_group_info_for_devices.side_effect = (
+                messaging.UnsupportedVersion('1.2'))
+        rpc.security_group_rules_for_devices.return_value = (
+            self.firewall.ports)
 
     def test_prepare_and_remove_devices_filter(self):
         self.agent.prepare_devices_filter(['fake_device'])
@@ -1041,39 +1044,20 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
         self.firewall.assert_has_calls([])
 
 
-class SecurityGroupAgentEnhancedRpcTestCase(base.BaseTestCase):
+class SecurityGroupAgentEnhancedRpcTestCase(
+    BaseSecurityGroupAgentRpcTestCase):
+
     def setUp(self, defer_refresh_firewall=False):
-        super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp()
-        cfg.CONF.set_default('firewall_driver',
-                             'neutron.agent.firewall.NoopFirewallDriver',
-                             group='SECURITYGROUP')
-        self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
-        self.agent.context = None
-        mock.patch('neutron.agent.linux.iptables_manager').start()
-        self.agent.root_helper = 'sudo'
-        rpc = mock.Mock()
-        self.agent.plugin_rpc = rpc
-        self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
-        self.firewall = mock.Mock()
-        firewall_object = firewall_base.FirewallDriver()
-        self.firewall.defer_apply.side_effect = firewall_object.defer_apply
-        self.agent.firewall = self.firewall
-        self.fake_device = {'device': 'fake_device',
-                            'security_groups': ['fake_sgid1', 'fake_sgid2'],
-                            'security_group_source_groups': ['fake_sgid2'],
-                            'security_group_rules': [{'security_group_id':
-                                                      'fake_sgid1',
-                                                      'remote_group_id':
-                                                      'fake_sgid2'}]}
-        fake_devices = {'fake_device': self.fake_device}
+        super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp(
+            defer_refresh_firewall=defer_refresh_firewall)
         fake_sg_info = {
             'security_groups': {
                 'fake_sgid1': [
                     {'remote_group_id': 'fake_sgid2'}], 'fake_sgid2': []},
             'sg_member_ips': {'fake_sgid2': {'IPv4': [], 'IPv6': []}},
-            'devices': fake_devices}
-        self.firewall.ports = fake_devices
-        rpc.security_group_info_for_devices.return_value = fake_sg_info
+            'devices': self.firewall.ports}
+        self.agent.plugin_rpc.security_group_info_for_devices.return_value = (
+            fake_sg_info)
 
     def test_prepare_and_remove_devices_filter_enhanced_rpc(self):
         self.agent.prepare_devices_filter(['fake_device'])
@@ -2015,16 +1999,13 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
     PHYSDEV_INGRESS = 'physdev-out'
     PHYSDEV_EGRESS = 'physdev-in'
 
-    def setUp(self, defer_refresh_firewall=False):
+    def setUp(self, defer_refresh_firewall=False, test_rpc_v1_1=True):
         super(TestSecurityGroupAgentWithIptables, self).setUp()
         config.register_root_helper(cfg.CONF)
         cfg.CONF.set_override(
             'lock_path',
             '$state_path/lock')
-        cfg.CONF.set_override(
-            'firewall_driver',
-            self.FIREWALL_DRIVER,
-            group='SECURITYGROUP')
+        set_firewall_driver(self.FIREWALL_DRIVER)
 
         self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
         self.agent.context = None
@@ -2033,8 +2014,10 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
         self.agent.root_helper = 'sudo'
         self.rpc = mock.Mock()
         self.agent.plugin_rpc = self.rpc
-        self.rpc.security_group_info_for_devices.side_effect = (
-            messaging.UnsupportedVersion('1.2'))
+
+        if test_rpc_v1_1:
+            self.rpc.security_group_info_for_devices.side_effect = (
+                messaging.UnsupportedVersion('1.2'))
 
         self.agent.init_firewall(
             defer_refresh_firewall=defer_refresh_firewall)
@@ -2216,21 +2199,8 @@ class TestSecurityGroupAgentEnhancedRpcWithIptables(
     TestSecurityGroupAgentWithIptables):
     def setUp(self, defer_refresh_firewall=False):
         super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp(
-            defer_refresh_firewall)
-        self.rpc = mock.Mock()
-        self.agent.plugin_rpc = self.rpc
+            defer_refresh_firewall=defer_refresh_firewall, test_rpc_v1_1=False)
         self.sg_info = self.rpc.security_group_info_for_devices
-        self.agent.init_firewall(
-            defer_refresh_firewall=defer_refresh_firewall)
-        self.iptables = self.agent.firewall.iptables
-        self.iptables_execute = mock.patch.object(self.iptables,
-                                                  "execute").start()
-        self.iptables_execute_return_values = []
-        self.expected_call_count = 0
-        self.expected_calls = []
-        self.expected_process_inputs = []
-        self.iptables_execute.side_effect = (
-            self.iptables_execute_return_values)
 
         rule1 = [{'direction': 'ingress',
                   'protocol': const.PROTO_NAME_UDP,