]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix port creation verification of the port-security extension
authorYalei Wang <yalei.wang@intel.com>
Tue, 21 Apr 2015 07:17:13 +0000 (15:17 +0800)
committerYalei Wang <yalei.wang@intel.com>
Thu, 30 Apr 2015 17:08:29 +0000 (01:08 +0800)
When port is created, we should check the content of the security-group and
address-pairs like we do when port updated.
This patch also updates address-pairs testing unskipping some
port-security-related tests.

Change-Id: Ia27881a34ff99cad34c84764d2bf8a6cdf77af9c
Closes-Bug: #1446087

neutron/plugins/ml2/plugin.py
neutron/tests/unit/db/test_allowedaddresspairs_db.py
neutron/tests/unit/plugins/ml2/test_ext_portsecurity.py
neutron/tests/unit/plugins/ml2/test_plugin.py

index c668454ac8e15f9d97375c39034c44c3a3aadba4..726fcdf8e9c91eef054ce8fed070a2013de2515f 100644 (file)
@@ -63,7 +63,6 @@ from neutron.extensions import extra_dhcp_opt as edo_ext
 from neutron.extensions import portbindings
 from neutron.extensions import portsecurity as psec
 from neutron.extensions import providernet as provider
-from neutron.extensions import securitygroup as ext_sg
 from neutron.extensions import vlantransparent
 from neutron.i18n import _LE, _LI, _LW
 from neutron import manager
@@ -946,7 +945,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                          port_data[psec.PORTSECURITY])
 
         # allowed address pair checks
-        if attributes.is_attr_set(attrs.get(addr_pair.ADDRESS_PAIRS)):
+        if self._check_update_has_allowed_address_pairs(port):
             if not port_security:
                 raise addr_pair.AddressPairAndPortSecurityRequired()
         else:
@@ -955,7 +954,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
 
         if port_security:
             self._ensure_default_security_group_on_port(context, port)
-        elif attributes.is_attr_set(attrs.get(ext_sg.SECURITYGROUPS)):
+        elif self._check_update_has_security_groups(port):
             raise psec.PortSecurityAndIPRequiredForSecurityGroups()
 
     def _create_port_db(self, context, port):
index 6185d2e18803f2297cec3bd40130be716d1f89ed..2af639f70896a1c9909ebe73c7922f1af5820a45 100644 (file)
@@ -142,6 +142,18 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase):
             self.deserialize(self.fmt, res)
             self.assertEqual(res.status_int, 409)
 
+            address_pairs = []
+            res = self._create_port(self.fmt, net['network']['id'],
+                                    arg_list=('port_security_enabled',
+                                              addr_pair.ADDRESS_PAIRS,),
+                                    port_security_enabled=False,
+                                    allowed_address_pairs=address_pairs)
+            port = self.deserialize(self.fmt, res)
+            self.assertFalse(port['port'][psec.PORTSECURITY])
+            self.assertEqual(port['port'][addr_pair.ADDRESS_PAIRS],
+                             address_pairs)
+            self._delete('ports', port['port']['id'])
+
     def test_create_port_bad_mac(self):
         address_pairs = [{'mac_address': 'invalid_mac',
                           'ip_address': '10.0.0.1'}]
index 6180ff10e86da2caa278bab0336bcf628a9b1893..0def93842e31024f2db414285482d5d50ad1f9a0 100644 (file)
@@ -13,6 +13,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from neutron.extensions import portsecurity as psec
 from neutron.plugins.ml2 import config
 from neutron.tests.unit.extensions import test_portsecurity as test_psec
 from neutron.tests.unit.plugins.ml2 import test_plugin
@@ -27,3 +28,18 @@ class PSExtDriverTestCase(test_plugin.Ml2PluginV2TestCase,
                                      self._extension_drivers,
                                      group='ml2')
         super(PSExtDriverTestCase, self).setUp()
+
+    def test_create_port_with_secgroup_none_and_port_security_false(self):
+        if self._skip_security_group:
+            self.skipTest("Plugin does not support security groups")
+        with self.network() as net:
+            with self.subnet(network=net):
+                res = self._create_port('json', net['network']['id'],
+                                        arg_list=('security_groups',
+                                                  'port_security_enabled'),
+                                        security_groups=[],
+                                        port_security_enabled=False)
+                self.assertEqual(res.status_int, 201)
+                port = self.deserialize('json', res)
+                self.assertFalse(port['port'][psec.PORTSECURITY])
+                self.assertEqual(port['port']['security_groups'], [])
index cc51029fdfa2ab558094e60f9d4f6797870515c8..aa6bde45849aecc52a2749f4492ee0da92182e32 100644 (file)
@@ -1133,7 +1133,12 @@ class TestMultiSegmentNetworks(Ml2PluginV2TestCase):
 
 class TestMl2AllowedAddressPairs(Ml2PluginV2TestCase,
                                  test_pair.TestAllowedAddressPairs):
+    _extension_drivers = ['port_security']
+
     def setUp(self, plugin=None):
+        config.cfg.CONF.set_override('extension_drivers',
+                                     self._extension_drivers,
+                                     group='ml2')
         super(test_pair.TestAllowedAddressPairs, self).setUp(
             plugin=PLUGIN_NAME)