- [N324] Prevent use of deprecated contextlib.nested.
- [N325] Python 3: Do not use xrange.
- [N326] Python 3: do not use basestring.
+- [N327] Python 3: do not use dict.iteritems.
Creating Unit Tests
-------------------
raise exceptions.InvalidInput(error_message=msg)
actions = "actions=%s" % flow_dict.pop('actions')
- for key, value in flow_dict.iteritems():
+ for key, value in six.iteritems(flow_dict):
if key == 'proto':
flow_expr_arr.append(value)
else:
import netaddr
from oslo_log import log as logging
+import six
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
if ex_gw_port:
def _gateway_ports_equal(port1, port2):
def _get_filtered_dict(d, ignore):
- return dict((k, v) for k, v in d.iteritems()
+ return dict((k, v) for k, v in six.iteritems(d)
if k not in ignore)
keys_to_ignore = set(['binding:host_id'])
else:
return item
- for key, value in self.iteritems():
+ for key, value in six.iteritems(self):
if isinstance(value, (list, tuple)):
# Keep the same type but convert dicts to DictModels
self[key] = type(value)(
import netaddr
from oslo_config import cfg
from oslo_log import log as logging
+import six
from neutron.agent import firewall
from neutron.agent.linux import ipset_manager
remote_sgs_to_remove = self._determine_remote_sgs_to_remove(
filtered_ports)
- for ip_version, remote_sg_ids in remote_sgs_to_remove.iteritems():
+ for ip_version, remote_sg_ids in six.iteritems(remote_sgs_to_remove):
self._clear_sg_members(ip_version, remote_sg_ids)
if self.enable_ipset:
self._remove_ipsets_for_remote_sgs(ip_version, remote_sg_ids)
remote_group_id_sets = self._get_remote_sg_ids_sets_by_ipversion(
filtered_ports)
for ip_version, remote_group_id_set in (
- remote_group_id_sets.iteritems()):
+ six.iteritems(remote_group_id_sets)):
sgs_to_remove_per_ipversion[ip_version].update(
set(self.pre_sg_members) - remote_group_id_set)
return sgs_to_remove_per_ipversion
remote_group_id_sets = {constants.IPv4: set(),
constants.IPv6: set()}
for port in filtered_ports:
- for ip_version, sg_ids in self._get_remote_sg_ids(
- port).iteritems():
+ remote_sg_ids = self._get_remote_sg_ids(port)
+ for ip_version, sg_ids in six.iteritems(remote_sg_ids):
remote_group_id_sets[ip_version].update(sg_ids)
return remote_group_id_sets
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
+import six
from neutron.agent.common import config
from neutron.agent.linux import iptables_comments as ic
elif ip_version == 6:
tables = self.ipv6
- for table, chains in builtin_chains[ip_version].iteritems():
+ for table, chains in six.iteritems(builtin_chains[ip_version]):
for chain in chains:
tables[table].add_chain(chain)
tables[table].add_rule(chain, '-j $%s' %
from oslo_config import cfg
from oslo_log import log as logging
+import six
from webob import exc
from neutron.common import constants
{'check': [u'a', u'b'], 'name': [u'Bob']}
"""
res = {}
- for key, values in request.GET.dict_of_lists().iteritems():
+ for key, values in six.iteritems(request.GET.dict_of_lists()):
if key in skips:
continue
values = [v for v in values if v]
if not extension_attrs_map:
return
- for resource, attrs in extension_attrs_map.iteritems():
+ for resource, attrs in six.iteritems(extension_attrs_map):
extended_attrs = extended_attributes.get(resource)
if extended_attrs:
attrs.update(extended_attrs)
def action(self, request, id):
input_dict = self._deserialize(request.body,
request.get_content_type())
- for action_name, handler in self.action_handlers.iteritems():
+ for action_name, handler in six.iteritems(self.action_handlers):
if action_name in input_dict:
return handler(input_dict, request, id)
# no action handler found (bump to downstream application)
def index(self, request):
extensions = []
- for _alias, ext in self.extension_manager.extensions.iteritems():
+ for _alias, ext in six.iteritems(self.extension_manager.extensions):
extensions.append(self._translate(ext))
return dict(extensions=extensions)
LOG.debug('Extended resource: %s',
resource.collection)
- for action, method in resource.collection_actions.iteritems():
+ for action, method in six.iteritems(resource.collection_actions):
conditions = dict(method=[method])
path = "/%s/%s" % (resource.collection, action)
with mapper.submapper(controller=resource.controller,
continue
try:
extended_attrs = ext.get_extended_resources(version)
- for resource, resource_attrs in extended_attrs.iteritems():
- if attr_map.get(resource, None):
- attr_map[resource].update(resource_attrs)
+ for res, resource_attrs in six.iteritems(extended_attrs):
+ if attr_map.get(res, None):
+ attr_map[res].update(resource_attrs)
else:
- attr_map[resource] = resource_attrs
+ attr_map[res] = resource_attrs
except AttributeError:
LOG.exception(_LE("Error fetching extended attributes for "
"extension '%s'"), ext.get_name())
from oslo_log import log as logging
import oslo_messaging
+import six
from neutron.common import constants
from neutron.common import rpc as n_rpc
l3_router.append(router)
l3_routers[l3_agent.host] = l3_router
- for host, routers in l3_routers.iteritems():
+ for host, routers in six.iteritems(l3_routers):
cctxt = self.client.prepare(server=host)
cctxt.cast(context, method, routers=routers)
from oslo_log import log as logging
import oslo_messaging
from oslo_serialization import jsonutils
+import six
from neutron.common import constants
from neutron.common import exceptions
def update_floatingip_statuses(self, context, router_id, fip_statuses):
"""Update operational status for a floating IP."""
with context.session.begin(subtransactions=True):
- for (floatingip_id, status) in fip_statuses.iteritems():
+ for (floatingip_id, status) in six.iteritems(fip_statuses):
LOG.debug("New status for floating IP %(floatingip_id)s: "
"%(status)s", {'floatingip_id': floatingip_id,
'status': status})
# TODO(salv-orlando): Structure of dict attributes should be improved
# to avoid iterating over items
val_func = val_params = None
- for (k, v) in key_validator.iteritems():
+ for (k, v) in six.iteritems(key_validator):
if k.startswith('type:'):
# ask forgiveness, not permission
try:
return
# Check whether all required keys are present
- required_keys = [key for key, spec in key_specs.iteritems()
+ required_keys = [key for key, spec in six.iteritems(key_specs)
if spec.get('required')]
if required_keys:
# Perform validation and conversion of all values
# according to the specifications.
- for key, key_validator in [(k, v) for k, v in key_specs.iteritems()
+ for key, key_validator in [(k, v) for k, v in six.iteritems(key_specs)
if k in data]:
msg = _validate_dict_item(key, key_validator, data)
if msg:
key, value = convert_kvp_str_to_list(kvp_str)
kvp_map.setdefault(key, set())
kvp_map[key].add(value)
- return dict((x, list(y)) for x, y in kvp_map.iteritems())
+ return dict((x, list(y)) for x, y in six.iteritems(kvp_map))
def convert_none_to_empty_list(value):
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
+import six
import webob.exc
from neutron.api import api_common
self._resource)
def _get_primary_key(self, default_primary_key='id'):
- for key, value in self._attr_info.iteritems():
+ for key, value in six.iteritems(self._attr_info):
if value.get('primary_key', False):
return key
return default_primary_key
def _filter_attributes(self, context, data, fields_to_strip=None):
if not fields_to_strip:
return data
- return dict(item for item in data.iteritems()
+ return dict(item for item in six.iteritems(data)
if (item[0] not in fields_to_strip))
def _do_field_list(self, original_fields):
# Load object to check authz
# but pass only attributes in the original body and required
# by the policy engine to the policy 'brain'
- field_list = [name for (name, value) in self._attr_info.iteritems()
+ field_list = [name for (name, value) in six.iteritems(self._attr_info)
if (value.get('required_by_policy') or
value.get('primary_key') or
'default' not in value)]
Controller._verify_attributes(res_dict, attr_info)
if is_create: # POST
- for attr, attr_vals in attr_info.iteritems():
+ for attr, attr_vals in six.iteritems(attr_info):
if attr_vals['allow_post']:
if ('default' not in attr_vals and
attr not in res_dict):
msg = _("Attribute '%s' not allowed in POST") % attr
raise webob.exc.HTTPBadRequest(msg)
else: # PUT
- for attr, attr_vals in attr_info.iteritems():
+ for attr, attr_vals in six.iteritems(attr_info):
if attr in res_dict and not attr_vals['allow_put']:
msg = _("Cannot update read-only attribute %s") % attr
raise webob.exc.HTTPBadRequest(msg)
- for attr, attr_vals in attr_info.iteritems():
+ for attr, attr_vals in six.iteritems(attr_info):
if (attr not in res_dict or
res_dict[attr] is attributes.ATTR_NOT_SPECIFIED):
continue
from oslo_config import cfg
from oslo_log import log as logging
import routes as routes_mapper
+import six
import six.moves.urllib.parse as urlparse
import webob
import webob.dec
metadata = {}
layout = []
- for name, collection in self.resources.iteritems():
+ for name, collection in six.iteritems(self.resources):
href = urlparse.urljoin(req.path_url, collection)
resource = {'name': name,
'collection': collection,
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
+import six
from neutron.common import constants as q_const
def dict2str(dic):
return ','.join("%s=%s" % (key, val)
- for key, val in sorted(dic.iteritems()))
+ for key, val in sorted(six.iteritems(dic)))
def str2dict(string):
else:
query_filter = (model.tenant_id == context.tenant_id)
# Execute query hooks registered from mixins and plugins
- for _name, hooks in self._model_query_hooks.get(model,
- {}).iteritems():
+ for _name, hooks in six.iteritems(self._model_query_hooks.get(model,
+ {})):
query_hook = hooks.get('query')
if isinstance(query_hook, six.string_types):
query_hook = getattr(self, query_hook, None)
def _apply_filters_to_query(self, query, model, filters):
if filters:
- for key, value in filters.iteritems():
+ for key, value in six.iteritems(filters):
column = getattr(model, key, None)
if column:
if not value:
query = query.filter(sql.false())
return query
query = query.filter(column.in_(value))
- for _name, hooks in self._model_query_hooks.get(model,
- {}).iteritems():
+ for _nam, hooks in six.iteritems(self._model_query_hooks.get(model,
+ {})):
result_filter = hooks.get('result_filters', None)
if isinstance(result_filter, six.string_types):
result_filter = getattr(self, result_filter, None)
"""
columns = [c.name for c in model.__table__.columns]
return dict((k, v) for (k, v) in
- data.iteritems() if k in columns)
+ six.iteritems(data) if k in columns)
from oslo_db import exception as db_exc
from oslo_log import log as logging
import oslo_messaging
+import six
import sqlalchemy as sa
from sqlalchemy import func
from sqlalchemy import or_
if active is not None:
query = (query.filter(agents_db.Agent.admin_state_up == active))
if filters:
- for key, value in filters.iteritems():
+ for key, value in six.iteritems(filters):
column = getattr(agents_db.Agent, key, None)
if column:
if not value:
from sqlalchemy.orm import exc
from oslo_utils import excutils
+import six
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
marker_obj = self._get_marker_obj(context, 'floatingip', limit,
marker)
if filters is not None:
- for key, val in API_TO_DB_COLUMN_MAP.iteritems():
+ for key, val in six.iteritems(API_TO_DB_COLUMN_MAP):
if key in filters:
filters[val] = filters.pop(key)
down_revision = '26b54cf9024d'
from alembic import op
+import six
import sqlalchemy as sa
from neutron.common import exceptions
raise DuplicateSecurityGroupsNamedDefault(
duplicates='; '.join('tenant %s: %s' %
(tenant_id, ', '.join(groups))
- for tenant_id, groups in res.iteritems()))
+ for tenant_id, groups in six.iteritems(res)))
def get_duplicate_default_security_groups(connection):
import re
import pep8
+import six
# Guidelines for writing new hacking checks
#
log_translation_hint = re.compile(
'|'.join('(?:%s)' % _regex_for_level(level, hint)
- for level, hint in _all_log_levels.iteritems()))
+ for level, hint in six.iteritems(_all_log_levels)))
oslo_namespace_imports_dot = re.compile(r"import[\s]+oslo[.][^\s]+")
oslo_namespace_imports_from_dot = re.compile(r"from[\s]+oslo[.]")
yield(0, msg)
+def check_python3_no_iteritems(logical_line):
+ if re.search(r".*\.iteritems\(\)", logical_line):
+ msg = ("N327: Use six.iteritems() instead of dict.iteritems().")
+ yield(0, msg)
+
+
def factory(register):
register(validate_log_translations)
register(use_jsonutils)
register(check_no_contextlib_nested)
register(check_python3_xrange)
register(check_no_basestring)
+ register(check_python3_no_iteritems)
from oslo_log import log as logging
import oslo_messaging
from oslo_utils import importutils
+import six
from neutron.common import utils
from neutron.i18n import _LE, _LI
@classmethod
def get_service_plugins(cls):
# Return weakrefs to minimize gc-preventing references.
+ service_plugins = cls.get_instance().service_plugins
return dict((x, weakref.proxy(y))
- for x, y in cls.get_instance().service_plugins.iteritems())
+ for x, y in six.iteritems(service_plugins))
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+import six
from neutron.agent.common import ovs_lib
from neutron.agent.linux import ip_lib
:param interface_mappings: map physical net names to interface names.
'''
- for physical_network, interface in interface_mappings.iteritems():
+ for physical_network, interface in six.iteritems(interface_mappings):
LOG.info(_LI("Mapping physical network %(physical_network)s to "
"interface %(interface)s"),
{'physical_network': physical_network,
from oslo_db import exception as db_exc
from oslo_log import log
+import six
from sqlalchemy import or_
from sqlalchemy.orm import exc
return []
ports_to_sg_ids = get_sg_ids_grouped_by_port(context, port_ids)
return [make_port_dict_with_security_groups(port, sec_groups)
- for port, sec_groups in ports_to_sg_ids.iteritems()]
+ for port, sec_groups in six.iteritems(ports_to_sg_ids)]
def get_sg_ids_grouped_by_port(context, port_ids):
from oslo_log import log
from oslo_serialization import jsonutils
import requests
+import six
from neutron.plugins.ml2 import driver_api as api
"""
if isinstance(obj, dict):
obj = dict((self.escape(k), self.escape_keys(v))
- for k, v in obj.iteritems())
+ for k, v in six.iteritems(obj))
if isinstance(obj, list):
obj = [self.escape_keys(x) for x in obj]
return obj
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
+import six
import sqlalchemy as sa
from neutron.common import exceptions as exc
% physical_network)
raise exc.InvalidInput(error_message=msg)
- for key, value in segment.iteritems():
+ for key, value in six.iteritems(segment):
if value and key not in [api.NETWORK_TYPE,
api.PHYSICAL_NETWORK]:
msg = _("%s prohibited for flat provider network") % key
# under the License.
from oslo_log import log
+import six
from neutron.common import exceptions as exc
from neutron.i18n import _LI
return False
def validate_provider_segment(self, segment):
- for key, value in segment.iteritems():
+ for key, value in six.iteritems(segment):
if value and key != api.NETWORK_TYPE:
msg = _("%s prohibited for local provider network") % key
raise exc.InvalidInput(error_message=msg)
from oslo_config import cfg
from oslo_log import log
+import six
import stevedore
from neutron.api.v2 import attributes
network[provider.SEGMENTATION_ID] = segment[api.SEGMENTATION_ID]
def initialize(self):
- for network_type, driver in self.drivers.iteritems():
+ for network_type, driver in six.iteritems(self.drivers):
LOG.info(_LI("Initializing driver for type '%s'"), network_type)
driver.obj.initialize()
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
+import six
from six import moves
from neutron.agent.common import ovs_lib
start_listening=False)
def get_net_uuid(self, vif_id):
- for network_id, vlan_mapping in self.local_vlan_map.iteritems():
+ for network_id, vlan_mapping in six.iteritems(self.local_vlan_map):
if vif_id in vlan_mapping.vif_ports:
return network_id
ip_wrapper = ip_lib.IPWrapper()
ovs = ovs_lib.BaseOVS()
ovs_bridges = ovs.get_bridges()
- for physical_network, bridge in bridge_mappings.iteritems():
+ for physical_network, bridge in six.iteritems(bridge_mappings):
LOG.info(_LI("Mapping physical network %(physical_network)s to "
"bridge %(bridge)s"),
{'physical_network': physical_network,
import re
from oslo_log import log as logging
+import six
from neutron.i18n import _LE, _LW
from neutron.plugins.sriovnicagent.common import exceptions as exc
"""
if exclude_devices is None:
exclude_devices = {}
- for phys_net, dev_name in device_mappings.iteritems():
+ for phys_net, dev_name in six.iteritems(device_mappings):
self._create_emb_switch(phys_net, dev_name,
exclude_devices.get(dev_name, set()))
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
+import six
from neutron.api.v2 import attributes
from neutron.common import constants as const
validate = attribute.get('validate')
return (validate and isinstance(sub_attr, collections.Iterable) and
any([k.startswith('type:dict') and
- v for (k, v) in validate.iteritems()]))
+ v for (k, v) in six.iteritems(validate)]))
def _build_subattr_match_rule(attr_name, attr, action, target):
# License for the specific language governing permissions and limitations
# under the License.
+import six
from tempest_lib.common.utils import data_utils
from neutron.tests.api import base
quota_set = self.admin_client.update_quotas(tenant_id,
**new_quotas)
self.addCleanup(self.admin_client.reset_quotas, tenant_id)
- for key, value in new_quotas.iteritems():
+ for key, value in six.iteritems(new_quotas):
self.assertEqual(value, quota_set[key])
# Confirm our tenant is listed among tenants with non default quotas
# Confirm from API quotas were changed as requested for tenant
quota_set = self.admin_client.show_quotas(tenant_id)
quota_set = quota_set['quota']
- for key, value in new_quotas.iteritems():
+ for key, value in six.iteritems(new_quotas):
self.assertEqual(value, quota_set[key])
# Reset quotas to default and confirm
import netaddr
import random
+import six
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
- kwargs = {k: v for k, v in kwargs.iteritems() if v}
+ kwargs = {k: v for k, v in six.iteritems(kwargs) if v}
real_ip, eui_ip = self._get_ips_from_subnet(**kwargs)
self._clean_network()
self.assertEqual(eui_ip, real_ip,
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
- kwargs = {k: v for k, v in kwargs.iteritems() if v}
+ kwargs = {k: v for k, v in six.iteritems(kwargs) if v}
subnet = self.create_subnet(self.network, **kwargs)
port = self.create_port(self.network)
port_ip = next(iter(port['fixed_ips']), None)['ip_address']
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
- kwargs = {k: v for k, v in kwargs.iteritems() if v}
+ kwargs = {k: v for k, v in six.iteritems(kwargs) if v}
subnet = self.create_subnet(self.network, **kwargs)
ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"],
subnet["allocation_pools"][0]["end"])
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
- kwargs = {k: v for k, v in kwargs.iteritems() if v}
+ kwargs = {k: v for k, v in six.iteritems(kwargs) if v}
subnet, port = self._create_subnet_router(kwargs)
port_ip = next(iter(port['fixed_ips']), None)['ip_address']
self._clean_network()
# License for the specific language governing permissions and limitations
# under the License.
+import six
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
def test_show_firewall_rule(self):
# show a created firewall rule
fw_rule = self.client.show_firewall_rule(self.fw_rule['id'])
- for key, value in fw_rule['firewall_rule'].iteritems():
+ for key, value in six.iteritems(fw_rule['firewall_rule']):
self.assertEqual(self.fw_rule[key], value)
@test.idempotent_id('1086dd93-a4c0-4bbb-a1bd-6d4bc62c199f')
# show a created firewall policy
fw_policy = self.client.show_firewall_policy(self.fw_policy['id'])
fw_policy = fw_policy['firewall_policy']
- for key, value in fw_policy.iteritems():
+ for key, value in six.iteritems(fw_policy):
self.assertEqual(self.fw_policy[key], value)
@test.idempotent_id('02082a03-3cdd-4789-986a-1327dd80bfb7')
firewall = self.client.show_firewall(firewall_id)
firewall = firewall['firewall']
- for key, value in firewall.iteritems():
+ for key, value in six.iteritems(firewall):
if key == 'status':
continue
self.assertEqual(created_firewall[key], value)
# License for the specific language governing permissions and limitations
# under the License.
+import six
from tempest_lib.common.utils import data_utils
from tempest_lib import decorators
body = create_obj(**kwargs)
obj = body[obj_name]
self.addCleanup(delete_obj, obj['id'])
- for key, value in obj.iteritems():
+ for key, value in six.iteritems(obj):
# It is not relevant to filter by all arguments. That is why
# there is a list of attr to except
if key not in attr_exceptions:
# Verifies the details of a vip
body = self.client.show_vip(self.vip['id'])
vip = body['vip']
- for key, value in vip.iteritems():
+ for key, value in six.iteritems(vip):
# 'status' should not be confirmed in api tests
if key != 'status':
self.assertEqual(self.vip[key], value)
# Verifies the details of a pool
body = self.client.show_pool(pool['id'])
shown_pool = body['pool']
- for key, value in pool.iteritems():
+ for key, value in six.iteritems(pool):
# 'status' should not be confirmed in api tests
if key != 'status':
self.assertEqual(value, shown_pool[key])
# Verifies the details of a member
body = self.client.show_member(self.member['id'])
member = body['member']
- for key, value in member.iteritems():
+ for key, value in six.iteritems(member):
# 'status' should not be confirmed in api tests
if key != 'status':
self.assertEqual(self.member[key], value)
# Verifies the details of a health_monitor
body = self.client.show_health_monitor(self.health_monitor['id'])
health_monitor = body['health_monitor']
- for key, value in health_monitor.iteritems():
+ for key, value in six.iteritems(health_monitor):
# 'status' should not be confirmed in api tests
if key != 'status':
self.assertEqual(self.health_monitor[key], value)
import itertools
import netaddr
+import six
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
**kwargs)
compare_args_full = dict(gateway_ip=gateway, cidr=cidr,
mask_bits=mask_bits, **kwargs)
- compare_args = dict((k, v) for k, v in compare_args_full.iteritems()
+ compare_args = dict((k, v) for k, v in six.iteritems(compare_args_full)
if v is not None)
if 'dns_nameservers' in set(subnet).intersection(compare_args):
# under the License.
import netaddr
+import six
from tempest_lib.common.utils import data_utils
from neutron.tests.api import base_routers as base
self.assertIsNone(actual_ext_gw_info)
return
# Verify only keys passed in exp_ext_gw_info
- for k, v in exp_ext_gw_info.iteritems():
+ for k, v in six.iteritems(exp_ext_gw_info):
self.assertEqual(v, actual_ext_gw_info[k])
def _verify_gateway_port(self, router_id):
# License for the specific language governing permissions and limitations
# under the License.
+import six
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
def _assertExpected(self, expected, actual):
# Check if not expected keys/values exists in actual response body
- for key, value in expected.iteritems():
+ for key, value in six.iteritems(expected):
self.assertIn(key, actual)
self.assertEqual(value, actual[key])
# Confirm that update was successful by verifying using 'show'
body = self.client.show_ikepolicy(ikepolicy['id'])
ike_policy = body['ikepolicy']
- for key, value in new_ike.iteritems():
+ for key, value in six.iteritems(new_ike):
self.assertIn(key, ike_policy)
self.assertEqual(value, ike_policy[key])
from oslo_config import cfg
from oslo_messaging import conffixture as messaging_conffixture
from oslo_utils import strutils
+import six
import testtools
from neutron.agent.linux import external_process
self.assertEqual(expect_val, actual_val)
def sort_dict_lists(self, dic):
- for key, value in dic.iteritems():
+ for key, value in six.iteritems(dic):
if isinstance(value, list):
dic[key] = sorted(value)
elif isinstance(value, dict):
test by the fixtures cleanup process.
"""
group = kw.pop('group', None)
- for k, v in kw.iteritems():
+ for k, v in six.iteritems(kw):
CONF.set_override(k, v, group)
def setup_coreplugin(self, core_plugin=None):
:param other: dictionary to be directly modified.
"""
- for key, value in other.iteritems():
+ for key, value in six.iteritems(other):
if isinstance(value, dict):
if not isinstance(value, base.AttributeDict):
other[key] = base.AttributeDict(value)
import netaddr
from oslo_config import cfg
from oslo_log import log as logging
+import six
import testtools
import webob
import webob.dec
# Get the last state reported for each router
actual_router_states = {}
for call in calls:
- for router_id, state in call.iteritems():
+ for router_id, state in six.iteritems(call):
actual_router_states[router_id] = state
return actual_router_states == expected
import re
+import six
from testtools import helpers
"""
def match(self, actual):
- for key, value in actual.iteritems():
+ for key, value in six.iteritems(actual):
if key in ('content-length', 'x-account-bytes-used',
'x-account-container-count', 'x-account-object-count',
'x-container-bytes-used', 'x-container-object-count')\
import functools
import jsonschema
+import six
from oslo_log import log as logging
if schema_type == 'object':
properties = schema["properties"]
- for attribute, definition in properties.iteritems():
+ for attribute, definition in six.iteritems(properties):
current_path = copy.copy(path)
if path is not None:
current_path.append(attribute)
# under the License.
from oslo_log import log as logging
+import six
import neutron.tests.tempest.common.generator.base_generator as base
@base.simple_generator
def generate_valid_object(self, schema):
obj = {}
- for k, v in schema["properties"].iteritems():
+ for k, v in six.iteritems(schema["properties"]):
obj[k] = self.generate_valid(v)
return obj
# under the License.
import fixtures
+import six
from neutron.api.v2 import attributes
# deeper than a shallow copy.
super(AttributeMapMemento, self).setUp()
self.contents_backup = {}
- for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
- self.contents_backup[resource] = attrs.copy()
+ for res, attrs in six.iteritems(attributes.RESOURCE_ATTRIBUTE_MAP):
+ self.contents_backup[res] = attrs.copy()
self.addCleanup(self.restore)
def restore(self):
import mock
from oslo_config import cfg
+import six
from neutron.agent.common import config as a_cfg
from neutron.agent.linux import ipset_manager
remote_groups = remote_groups or {_IPv4: [FAKE_SGID],
_IPv6: [FAKE_SGID]}
rules = []
- for ip_version, remote_group_list in remote_groups.iteritems():
+ for ip_version, remote_group_list in six.iteritems(remote_groups):
for remote_group in remote_group_list:
rules.append(self._fake_sg_rule_for_ethertype(ip_version,
remote_group))
from oslo_log import log as logging
from oslo_serialization import jsonutils
import routes
+import six
import webob
import webob.exc as webexc
import webtest
self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
- for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
- self.saved_attr_map[resource] = attrs.copy()
+ for res, attrs in six.iteritems(attributes.RESOURCE_ATTRIBUTE_MAP):
+ self.saved_attr_map[res] = attrs.copy()
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
import mock
from oslo_config import cfg
+import six
from six import moves
import six.moves.urllib.parse as urlparse
import webob
output_dict = res['networks'][0]
input_dict['shared'] = False
self.assertEqual(len(input_dict), len(output_dict))
- for k, v in input_dict.iteritems():
+ for k, v in six.iteritems(input_dict):
self.assertEqual(v, output_dict[k])
else:
# expect no results
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_utils import importutils
+import six
from sqlalchemy import orm
from testtools import matchers
import webob.exc
cfg.CONF.set_override(
'service_plugins',
[test_lib.test_config.get(key, default)
- for key, default in (service_plugins or {}).iteritems()]
+ for key, default in six.iteritems(service_plugins or {})]
)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
import mock
import oslo_db.exception as exc
+import six
import testtools
import webob.exc
"""Asserts that the sg rule has expected key/value pairs passed
in as expected_kvs dictionary
"""
- for k, v in expected_kvs.iteritems():
+ for k, v in six.iteritems(expected_kvs):
self.assertEqual(security_group_rule[k], v)
test_addr = {'192.168.1.1/24': 'IPv6',
'2001:db8:1234::/48': 'IPv4',
'192.168.2.1/24': 'BadEthertype'}
- for remote_ip_prefix, ethertype in test_addr.iteritems():
+ for remote_ip_prefix, ethertype in six.iteritems(test_addr):
with self.security_group(name, description) as sg:
sg_id = sg['security_group']['id']
rule = self._build_security_group_rule(
def test_convert_ip_prefix_no_netmask_to_cidr(self):
addr = {'10.1.2.3': '32', 'fe80::2677:3ff:fe7d:4c': '128'}
- for k, v in addr.iteritems():
+ for k, v in six.iteritems(addr):
self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(k),
'%s/%s' % (k, v))
# under the License.
from oslo_config import cfg
+import six
from webob import exc as web_exc
from neutron.api.v2 import attributes
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
- for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
- self.saved_attr_map[resource] = attrs.copy()
+ for res, attrs in six.iteritems(attributes.RESOURCE_ATTRIBUTE_MAP):
+ self.saved_attr_map[res] = attrs.copy()
# Update the plugin and extensions path
self.setup_coreplugin(plugin)
def test_no_basestring(self):
self.assertEqual(1,
len(list(checks.check_no_basestring("isinstance(x, basestring)"))))
+
+ def test_check_python3_iteritems(self):
+ f = checks.check_python3_no_iteritems
+ self.assertLineFails(f, "d.iteritems()")
+ self.assertLinePasses(f, "six.iteritems(d)")
from oslo_serialization import jsonutils
from oslo_utils import excutils
import routes.middleware
+import six
import webob.dec
import webob.exc
resp = req.get_response(self.application)
print(("*" * 40) + " RESPONSE HEADERS")
- for (key, value) in resp.headers.iteritems():
+ for (key, value) in six.iteritems(resp.headers):
print(key, "=", value)
print()