--- /dev/null
+# Copyright 2014 NEC Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+def cmp_dpid(dpid_a, dpid_b):
+ """Compare two datapath IDs as hexadecimal int.
+
+ It returns True if equal, otherwise False.
+ """
+ try:
+ return (int(dpid_a, 16) == int(dpid_b, 16))
+ except Exception:
+ return False
from neutron.plugins.common import constants as svc_constants
from neutron.plugins.nec.common import config
from neutron.plugins.nec.common import exceptions as nexc
+from neutron.plugins.nec.common import utils as necutils
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec.db import router as rdb
from neutron.plugins.nec import extensions
portinfo = self._validate_portinfo(profile)
portinfo_changed = 'ADD'
if cur_portinfo:
- if (portinfo['datapath_id'] == cur_portinfo.datapath_id and
+ if (necutils.cmp_dpid(portinfo['datapath_id'],
+ cur_portinfo.datapath_id) and
portinfo['port_no'] == cur_portinfo.port_no):
return
ndb.del_portinfo(context.session, port['id'])
id = p['id']
portinfo = ndb.get_portinfo(session, id)
if portinfo:
- if (portinfo.datapath_id == datapath_id and
+ if (necutils.cmp_dpid(portinfo.datapath_id, datapath_id) and
portinfo.port_no == p['port_no']):
LOG.debug(_("update_ports(): ignore unchanged portinfo in "
"port_added message (port_id=%s)."), id)
"due to portinfo for port_id=%s was not "
"registered"), id)
continue
- if portinfo.datapath_id != datapath_id:
+ if not necutils.cmp_dpid(portinfo.datapath_id, datapath_id):
LOG.debug(_("update_ports(): ignore port_removed message "
"received from different host "
"(registered_datapath_id=%(registered)s, "
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
+ def test_port_update_for_existing_port_with_different_padding_dpid(self):
+ ctx = context.get_admin_context()
+ with self.port() as port:
+ port_id = port['port']['id']
+ portinfo = {'id': port_id, 'port_no': 123}
+ self.rpcapi_update_ports(datapath_id='0x000000000000abcd',
+ added=[portinfo])
+ self.assertEqual(1, self.ofc.create_ofc_port.call_count)
+ self.assertEqual(0, self.ofc.delete_ofc_port.call_count)
+
+ profile_arg = {portbindings.PROFILE:
+ self._get_portinfo(datapath_id='0xabcd',
+ port_no=123)}
+ self._update('ports', port_id, {'port': profile_arg},
+ neutron_context=ctx)
+ # Check create_ofc_port/delete_ofc_port are not called.
+ self.assertEqual(1, self.ofc.create_ofc_port.call_count)
+ self.assertEqual(0, self.ofc.delete_ofc_port.call_count)
+
def test_port_create_portinfo_non_admin(self):
with self.network(set_context=True, tenant_id='test') as net1:
with self.subnet(network=net1) as subnet1:
--- /dev/null
+# Copyright 2014 NEC Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.plugins.nec.common import utils
+from neutron.tests import base
+
+
+class NecUtilsTest(base.BaseTestCase):
+
+ def test_cmp_dpid(self):
+ self.assertTrue(utils.cmp_dpid('0xabcd', '0xabcd'))
+ self.assertTrue(utils.cmp_dpid('abcd', '0xabcd'))
+ self.assertTrue(utils.cmp_dpid('0x000000000000abcd', '0xabcd'))
+ self.assertTrue(utils.cmp_dpid('0x000000000000abcd', '0x00abcd'))
+ self.assertFalse(utils.cmp_dpid('0x000000000000abcd', '0xabc0'))
+ self.assertFalse(utils.cmp_dpid('0x000000000000abcd', '0x00abc0'))
+
+ def test_cmp_dpid_with_exception(self):
+ self.assertFalse(utils.cmp_dpid('0xabcx', '0xabcx'))
+ self.assertFalse(utils.cmp_dpid(None, None))