for existing_port in existing_ports:
current_port = current_ports_dict.get(existing_port['id'])
if current_port:
- if sorted(existing_port['fixed_ips']) != (
- sorted(current_port['fixed_ips'])):
+ if (sorted(existing_port['fixed_ips'],
+ key=common_utils.safe_sort_key) !=
+ sorted(current_port['fixed_ips'],
+ key=common_utils.safe_sort_key)):
updated_ports[current_port['id']] = current_port
return updated_ports
"""Utilities and helper functions."""
+import collections
import datetime
import decimal
import errno
return set(a) == set(b)
+def safe_sort_key(value):
+ """Return value hash or build one for dictionaries."""
+ if isinstance(value, collections.Mapping):
+ return sorted(value.items())
+ return value
+
+
def dict2str(dic):
return ','.join("%s=%s" % (key, val)
for key, val in sorted(six.iteritems(dic)))
import unittest
+from neutron.common import utils
+
def setup_mock_calls(mocked_call, expected_calls_and_values):
return_values = [call[1] for call in expected_calls_and_values]
def __eq__(self, other):
if not isinstance(other, list):
return False
- return sorted(self) == sorted(other)
+ return (sorted(self, key=utils.safe_sort_key) ==
+ sorted(other, key=utils.safe_sort_key))
def __neq__(self, other):
return not self == other
for k in keys:
self.assertIn(k, resource[res_name])
if isinstance(keys[k], list):
- self.assertEqual(sorted(resource[res_name][k]),
- sorted(keys[k]))
+ self.assertEqual(
+ sorted(resource[res_name][k], key=utils.safe_sort_key),
+ sorted(keys[k], key=utils.safe_sort_key))
else:
self.assertEqual(resource[res_name][k], keys[k])
req = self.new_update_request('subnets', data,
res['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
- self.assertEqual(sorted(res['subnet']['host_routes']),
- sorted(host_routes))
+ self.assertEqual(
+ sorted(res['subnet']['host_routes'], key=utils.safe_sort_key),
+ sorted(host_routes, key=utils.safe_sort_key))
self.assertEqual(res['subnet']['dns_nameservers'],
dns_nameservers)
from webob import exc
from neutron.common import constants
+from neutron.common import utils
from neutron.db import extraroute_db
from neutron.extensions import extraroute
from neutron.extensions import l3
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
- self.assertEqual(sorted(body['router']['routes']),
- sorted(routes))
+ self.assertEqual(
+ sorted(body['router']['routes'],
+ key=utils.safe_sort_key),
+ sorted(routes, key=utils.safe_sort_key))
self._routes_update_cleanup(p['port']['id'],
None, r['router']['id'], [])
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes_orig)
- self.assertEqual(sorted(body['router']['routes']),
- sorted(routes_orig))
+ self.assertEqual(
+ sorted(body['router']['routes'],
+ key=utils.safe_sort_key),
+ sorted(routes_orig, key=utils.safe_sort_key))
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes_left,
skip_add=True)
- self.assertEqual(sorted(body['router']['routes']),
- sorted(routes_left))
+ self.assertEqual(
+ sorted(body['router']['routes'],
+ key=utils.safe_sort_key),
+ sorted(routes_left, key=utils.safe_sort_key))
self._routes_update_cleanup(p['port']['id'],
None, r['router']['id'], [])
from neutron.api import extensions as neutron_extensions
from neutron.api.v2 import attributes
+from neutron.common import utils
from neutron import context
import neutron.db.api as db
from neutron.extensions import portbindings
is_admin=False)
res = self._list(resource, neutron_context=ctx)
self.assertEqual(len(expected_profiles), len(res[resource]))
- profiles = sorted(res[resource])
+ profiles = sorted(res[resource], key=utils.safe_sort_key)
for i in range(len(profiles)):
self.assertEqual(expected_profiles[i].id,
profiles[i]['id'])
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
subnet = self.deserialize(self.fmt, req.get_response(self.api))
- self.assertEqual(sorted(subnet['subnet']['host_routes']),
- sorted(host_routes))
+ self.assertEqual(
+ sorted(subnet['subnet']['host_routes'],
+ key=utils.safe_sort_key),
+ sorted(host_routes, key=utils.safe_sort_key))
self.assertEqual(sorted(subnet['subnet']['dns_nameservers']),
sorted(dns_nameservers))
# In N1K we need to delete the subnet before the network
neutron.tests.unit.plugins.brocade.test_brocade_db \
neutron.tests.unit.plugins.brocade.test_brocade_plugin \
neutron.tests.unit.plugins.brocade.test_brocade_vlan \
+ neutron.tests.unit.plugins.embrane.test_embrane_neutron_plugin \
neutron.tests.unit.plugins.oneconvergence.test_nvsd_agent \
+ neutron.tests.unit.plugins.oneconvergence.test_nvsd_plugin \
neutron.tests.unit.plugins.oneconvergence.test_plugin_helper \
neutron.tests.unit.plugins.oneconvergence.test_nvsdlib \
neutron.tests.unit.plugins.ibm.test_sdnve_agent \
neutron.tests.unit.scheduler.test_dhcp_agent_scheduler \
neutron.tests.unit.db.test_agentschedulers_db \
neutron.tests.unit.db.test_allowedaddresspairs_db \
+ neutron.tests.unit.db.test_db_base_plugin_v2 \
neutron.tests.unit.db.test_ipam_backend_mixin \
neutron.tests.unit.db.test_l3_dvr_db \
neutron.tests.unit.db.test_l3_hamode_db \
neutron.tests.unit.extensions.test_flavors \
neutron.tests.unit.extensions.test_l3_ext_gw_mode \
neutron.tests.unit.extensions.test_extra_dhcp_opt \
+ neutron.tests.unit.extensions.test_extraroute \
neutron.tests.unit.extensions.test_netmtu \
neutron.tests.unit.extensions.test_vlantransparent \
neutron.tests.unit.extensions.extendedattribute \