]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Allow None for binding:profile attribute
authorAkihiro MOTOKI <motoki@da.jp.nec.com>
Tue, 3 Sep 2013 12:57:53 +0000 (21:57 +0900)
committerAkihiro MOTOKI <motoki@da.jp.nec.com>
Wed, 4 Sep 2013 16:16:52 +0000 (01:16 +0900)
We need to pass None in binding:profile to allow an administrator
to clear binding:profile attribute.

Closes-Bug: #1220011

Adds dedicated unit tests to the plugins which uses binding:profile
attribute (Mellanox and NEC plugins at now).

This commit also adds common unit tests for binding:profile to
the common PortBindingTestCase class.
- create_port with binding:profile whose value is None or {}
- update_port with binding:profile whose value is None or {}
- Reject binding:profile from non-admin user

Note that _make_port() in BigSwitch plugin test is updated
to allow passing arg_list() from the base test class.

Fix a bug in NEC plugin that 500 is returned when putting
binding:profile None to a port whose binding:profile is
already None (Closes-Bug: #1220720)

Change-Id: I146afe961cd445a023adc7233588d8034fdb8437

neutron/extensions/portbindings.py
neutron/plugins/nec/nec_plugin.py
neutron/tests/unit/_test_extension_portbindings.py
neutron/tests/unit/bigswitch/test_restproxy_plugin.py
neutron/tests/unit/mlnx/test_mlnx_plugin.py
neutron/tests/unit/nec/test_portbindings.py

index e09d04642dad800fbb3f3d9022c4bc50e1be9491..425f8dc38c5ec5a0c26f587796a16862436be302 100644 (file)
@@ -63,7 +63,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
         PROFILE: {'allow_post': True, 'allow_put': True,
                   'default': attributes.ATTR_NOT_SPECIFIED,
                   'enforce_policy': True,
-                  'validate': {'type:dict': None},
+                  'validate': {'type:dict_or_none': None},
                   'is_visible': True},
         CAPABILITIES: {'allow_post': False, 'allow_put': False,
                        'default': attributes.ATTR_NOT_SPECIFIED,
index df3ee1ef628e71a6e2ee0eef4894089258c622a5..cefcac1c01a4d7f91830bd17ef37b4f0c527e9dc 100644 (file)
@@ -430,6 +430,9 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
             portinfo_changed = 'DEL'
             portinfo = None
             ndb.del_portinfo(context.session, port['id'])
+        else:
+            portinfo = None
+            portinfo_changed = None
         self._extend_port_dict_binding_portinfo(port, portinfo)
         return portinfo_changed
 
index ae41fdb6c50beb84891bb7c54242d3f7987aa702..41fb2d6b70757e749840f3d872af901cd1a39ba9 100644 (file)
@@ -48,6 +48,12 @@ class PortBindingsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
         self.assertFalse(portbindings.VIF_TYPE in port)
         self.assertFalse(portbindings.CAPABILITIES in port)
 
+    def _get_non_admin_context(self):
+        return context.Context(user_id=None,
+                               tenant_id=self._tenant_id,
+                               is_admin=False,
+                               read_deleted="no")
+
     def test_port_vif_details(self):
         with self.port(name='name') as port:
             port_id = port['port']['id']
@@ -58,10 +64,7 @@ class PortBindingsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
             port = self._show('ports', port_id, neutron_context=ctx)['port']
             self._check_response_portbindings(port)
             # By default user is admin - now test non admin user
-            ctx = context.Context(user_id=None,
-                                  tenant_id=self._tenant_id,
-                                  is_admin=False,
-                                  read_deleted="no")
+            ctx = self._get_non_admin_context()
             non_admin_port = self._show(
                 'ports', port_id, neutron_context=ctx)['port']
             self._check_response_no_portbindings(non_admin_port)
@@ -76,15 +79,81 @@ class PortBindingsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
             for port in ports:
                 self._check_response_portbindings(port)
             # By default user is admin - now test non admin user
-            ctx = context.Context(user_id=None,
-                                  tenant_id=self._tenant_id,
-                                  is_admin=False,
-                                  read_deleted="no")
+            ctx = self._get_non_admin_context()
             ports = self._list('ports', neutron_context=ctx)['ports']
             self.assertEqual(len(ports), 2)
             for non_admin_port in ports:
                 self._check_response_no_portbindings(non_admin_port)
 
+    def _check_default_port_binding_profile(self, port):
+        # For plugins which does not use binding:profile attr
+        # we just check an operation for the port succeed.
+        self.assertIn('id', port)
+
+    def _test_create_port_binding_profile(self, profile):
+        profile_arg = {portbindings.PROFILE: profile}
+        with self.port(arg_list=(portbindings.PROFILE,),
+                       **profile_arg) as port:
+            self._check_default_port_binding_profile(port['port'])
+
+    def test_create_port_binding_profile_none(self):
+        self._test_create_port_binding_profile(None)
+
+    def test_create_port_binding_profile_with_empty_dict(self):
+        self._test_create_port_binding_profile({})
+
+    def _test_update_port_binding_profile(self, profile):
+        profile_arg = {portbindings.PROFILE: profile}
+        with self.port() as port:
+            # print "(1) %s" % port
+            self._check_default_port_binding_profile(port['port'])
+            port_id = port['port']['id']
+            ctx = context.get_admin_context()
+            port = self._update('ports', port_id, {'port': profile_arg},
+                                neutron_context=ctx)['port']
+            self._check_default_port_binding_profile(port)
+
+    def test_update_port_binding_profile_none(self):
+        self._test_update_port_binding_profile(None)
+
+    def test_update_port_binding_profile_with_empty_dict(self):
+        self._test_update_port_binding_profile({})
+
+    def test_port_create_portinfo_non_admin(self):
+        profile_arg = {portbindings.PROFILE: {'dummy': 'dummy'}}
+        with self.network(set_context=True, tenant_id='test') as net1:
+            with self.subnet(network=net1) as subnet1:
+                # succeed without binding:profile
+                with self.port(subnet=subnet1,
+                               set_context=True, tenant_id='test'):
+                    pass
+                # fail with binding:profile
+                try:
+                    with self.port(subnet=subnet1,
+                                   expected_res_status=403,
+                                   arg_list=(portbindings.PROFILE,),
+                                   set_context=True, tenant_id='test',
+                                   **profile_arg):
+                        pass
+                except exc.HTTPClientError:
+                    pass
+
+    def test_port_update_portinfo_non_admin(self):
+        profile_arg = {portbindings.PROFILE: {'dummy': 'dummy'}}
+        with self.network() as net1:
+            with self.subnet(network=net1) as subnet1:
+                with self.port(subnet=subnet1) as port:
+                    # By default user is admin - now test non admin user
+                    # Note that 404 is returned when prohibit by policy.
+                    # See comment for PolicyNotAuthorized except clause
+                    # in update() in neutron.api.v2.base.Controller.
+                    port_id = port['port']['id']
+                    ctx = self._get_non_admin_context()
+                    port = self._update('ports', port_id,
+                                        {'port': profile_arg},
+                                        expected_code=404,
+                                        neutron_context=ctx)
+
 
 class PortBindingsHostTestCaseMixin(object):
     fmt = 'json'
index 34dd567d54bb3b7f3bb5d4f1fd290a08be663042..6107d9369497f3405d9e8389313c5dea4b035048 100644 (file)
@@ -222,9 +222,12 @@ class TestBigSwitchVIFOverride(test_plugin.TestPortsV2,
             res = self.deserialize(self.fmt, req.get_response(self.api))
             self.assertEqual(res['port']['binding:vif_type'], self.VIF_TYPE)
 
-    def _make_port(self, fmt, net_id, expected_res_status=None, **kwargs):
+    def _make_port(self, fmt, net_id, expected_res_status=None, arg_list=None,
+                   **kwargs):
+        arg_list = arg_list or ()
+        arg_list += ('binding:host_id', )
         res = self._create_port(fmt, net_id, expected_res_status,
-                                ('binding:host_id', ), **kwargs)
+                                arg_list, **kwargs)
         # Things can go wrong - raise HTTP exc with res code only
         # so it can be caught by unit tests
         if res.status_int >= 400:
index 0aa24b554a31399e9b51c2ab52c1bf2653adbe26..f329a0203055ce06bc60a7d56aece8f25f2a05a4 100644 (file)
@@ -13,6 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from webob import exc
+
+from neutron.extensions import portbindings
 from neutron.plugins.mlnx.common import constants
 from neutron.tests.unit import _test_extension_portbindings as test_bindings
 from neutron.tests.unit import test_db_plugin as test_plugin
@@ -53,6 +56,45 @@ class TestMlnxPortBinding(MlnxPluginV2TestCase,
     VIF_TYPE = constants.VIF_TYPE_DIRECT
     HAS_PORT_FILTER = False
 
+    def _check_default_port_binding_profole(self, port,
+                                            expected_vif_type=None):
+        if expected_vif_type is None:
+            expected_vif_type = constants.VIF_TYPE_DIRECT
+        p = port['port']
+        self.assertIn('id', p)
+        self.assertEqual(expected_vif_type, p[portbindings.VIF_TYPE])
+        self.assertEqual({'physical_network': 'default'},
+                         p[portbindings.PROFILE])
+
+    def test_create_port_no_binding_profile(self):
+        with self.port() as port:
+            self._check_default_port_binding_profole(port)
+
+    def test_create_port_binding_profile_none(self):
+        profile_arg = {portbindings.PROFILE: None}
+        with self.port(arg_list=(portbindings.PROFILE,),
+                       **profile_arg) as port:
+            self._check_default_port_binding_profole(port)
+
+    def test_create_port_binding_profile_vif_type(self):
+        for vif_type in [constants.VIF_TYPE_HOSTDEV,
+                         constants.VIF_TYPE_DIRECT]:
+            profile_arg = {portbindings.PROFILE:
+                           {constants.VNIC_TYPE: vif_type}}
+            with self.port(arg_list=(portbindings.PROFILE,),
+                           **profile_arg) as port:
+                self._check_default_port_binding_profole(
+                    port, expected_vif_type=vif_type)
+
+    def test_create_port_binding_profile_with_empty_dict(self):
+        profile_arg = {portbindings.PROFILE: {}}
+        try:
+            with self.port(arg_list=(portbindings.PROFILE,),
+                           expected_res_status=400, **profile_arg):
+                pass
+        except exc.HTTPClientError:
+            pass
+
 
 class TestMlnxPortBindingNoSG(TestMlnxPortBinding):
     HAS_PORT_FILTER = False
index 7f2da8dbde7a629d73d95026270653bde7c53abb..2c096af8fa07814ce1fb97e8ca2f282ec32b074c 100644 (file)
@@ -149,13 +149,34 @@ class TestNecPortBindingPortInfo(test_nec_plugin.NecPluginV2TestCase):
             self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
             self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
 
-            # delete portinfo
+            # delete portinfo with an empty dict
             profile_arg = {portbindings.PROFILE: {}}
             port = self._update('ports', port_id, {'port': profile_arg},
                                 neutron_context=ctx)['port']
             self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
             self.assertEqual(self.ofc.delete_ofc_port.call_count, 2)
 
+    def test_port_update_portinfo_detail_clear_with_none(self):
+        with self.port() as port:
+            self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
+            self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
+            port_id = port['port']['id']
+            ctx = context.get_admin_context()
+
+            # add portinfo
+            profile_arg = {portbindings.PROFILE: self._get_portinfo()}
+            port = self._update('ports', port_id, {'port': profile_arg},
+                                neutron_context=ctx)['port']
+            self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+            self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
+
+            # delete portinfo with None
+            profile_arg = {portbindings.PROFILE: None}
+            port = self._update('ports', port_id, {'port': profile_arg},
+                                neutron_context=ctx)['port']
+            self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+            self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
+
     def test_port_create_portinfo_with_empty_dict(self):
         profile_arg = {portbindings.PROFILE: {}}
         with self.port(arg_list=(portbindings.PROFILE,),
@@ -174,6 +195,24 @@ class TestNecPortBindingPortInfo(test_nec_plugin.NecPluginV2TestCase):
             self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
             self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
 
+    def test_port_create_portinfo_with_none(self):
+        profile_arg = {portbindings.PROFILE: None}
+        with self.port(arg_list=(portbindings.PROFILE,),
+                       **profile_arg) as port:
+            port_id = port['port']['id']
+
+            # Check a response of create_port
+            self._check_response_portbinding_no_profile(port['port'])
+            self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
+            # add portinfo
+            ctx = context.get_admin_context()
+            profile_arg = {portbindings.PROFILE: self._get_portinfo()}
+            port = self._update('ports', port_id, {'port': profile_arg},
+                                neutron_context=ctx)['port']
+            self._check_response_portbinding_profile(port)
+            self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+            self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
+
     def test_port_create_portinfo_non_admin(self):
         with self.network(set_context=True, tenant_id='test') as net1:
             with self.subnet(network=net1) as subnet1: