key=operator.itemgetter(0))
itemgetter_1 = operator.itemgetter(1)
for action, action_flow_list in grouped:
- flows = map(itemgetter_1, action_flow_list)
+ flows = list(map(itemgetter_1, action_flow_list))
self.br.do_action_flows(action, flows)
def __enter__(self):
The return value will be a tuple of the process object and the
list of command arguments used to create it.
"""
- cmd = map(str, addl_env_args(addl_env) + cmd)
+ cmd = list(map(str, addl_env_args(addl_env) + cmd))
if run_as_root:
cmd = shlex.split(config.get_root_helper(cfg.CONF)) + cmd
LOG.debug("Running command: %s", cmd)
def execute_rootwrap_daemon(cmd, process_input, addl_env):
- cmd = map(str, addl_env_args(addl_env) + cmd)
+ cmd = list(map(str, addl_env_args(addl_env) + cmd))
# NOTE(twilson) oslo_rootwrap.daemon will raise on filter match
# errors, whereas oslo_rootwrap.cmd converts them to return codes.
# In practice, no neutron code should be trying to execute something that
def create_process(cmd, addl_env=None):
- cmd = map(str, cmd)
+ cmd = list(map(str, cmd))
LOG.debug("Running command: %s", cmd)
env = os.environ.copy()
in_(AUTO_DELETE_PORT_OWNERS)))
network_ports = qry_network_ports.all()
if network_ports:
- map(context.session.delete, network_ports)
+ for port in network_ports:
+ context.session.delete(port)
# Check if there are more IP allocations, unless
# is_auto_address_subnet is True. In that case the check is
# unnecessary. This additional check not only would be wasteful
mods = repos.NeutronModules()
-VALID_SERVICES = map(mods.alembic_name, mods.installed_list())
+VALID_SERVICES = list(map(mods.alembic_name, mods.installed_list()))
_core_opts = [
self.interfaces = {}
self.lldpcmd = None
self.peers = {}
- self.port_desc_re = map(re.compile, ACI_PORT_DESCR_FORMATS)
+ self.port_desc_re = list(map(re.compile, ACI_PORT_DESCR_FORMATS))
self.service_agent = ApicTopologyServiceNotifierApi()
self.state = None
self.state_agent = None
allocated = qry_allocated.all()
# Delete all the IPAllocation that can be auto-deleted
if allocated:
- map(session.delete, allocated)
+ for x in allocated:
+ session.delete(x)
LOG.debug("Ports to auto-deallocate: %s", allocated)
# Check if there are more IP allocations, unless
# is_auto_address_subnet is True. In that case the check is
CONF.network.public_network_id)
public_subnet_id = public_net_body['network']['subnets'][0]
self.assertIn(public_subnet_id,
- map(lambda x: x['subnet_id'], fixed_ips))
+ [x['subnet_id'] for x in fixed_ips])
@test.attr(type='smoke')
@test.idempotent_id('6cc285d8-46bf-4f36-9b1a-783e3008ba79')
ri.router = {'distributed': False}
ri._handle_router_snat_rules(ex_gw_port, "iface", "add_rules")
- nat_rules = map(str, ri.iptables_manager.ipv4['nat'].rules)
+ nat_rules = list(map(str, ri.iptables_manager.ipv4['nat'].rules))
wrap_name = ri.iptables_manager.wrap_name
jump_float_rule = "-A %s-snat -j %s-float-snat" % (wrap_name,
self.assertThat(nat_rules.index(jump_float_rule),
matchers.LessThan(nat_rules.index(snat_rule1)))
- mangle_rules = map(str, ri.iptables_manager.ipv4['mangle'].rules)
+ mangle_rules = list(map(str, ri.iptables_manager.ipv4['mangle'].rules))
mangle_rule = ("-A %s-mark -i iface "
"-j MARK --set-xmark 0x2/0xffffffff") % wrap_name
self.assertIn(mangle_rule, mangle_rules)
plugin.get_ports_from_devices(self.ctx,
['%s%s' % (const.TAP_DEVICE_PREFIX, i)
for i in range(ports_to_query)])
- all_call_args = map(lambda x: x[1][1], get_mock.mock_calls)
+ all_call_args = [x[1][1] for x in get_mock.mock_calls]
last_call_args = all_call_args.pop()
# all but last should be getting MAX_PORTS_PER_QUERY ports
self.assertTrue(