try:
with open(file_name, 'r') as f:
try:
- return converter and converter(f.read()) or f.read()
+ return converter(f.read()) if converter else f.read()
except ValueError:
msg = _('Unable to convert value in %s')
except IOError:
try:
obj, cmd = create_process(cmd, root_helper=root_helper,
addl_env=addl_env)
- _stdout, _stderr = (process_input and
- obj.communicate(process_input) or
- obj.communicate())
+ _stdout, _stderr = obj.communicate(process_input)
obj.stdin.close()
m = _("\nCommand: %(cmd)s\nExit code: %(code)s\nStdout: %(stdout)r\n"
"Stderr: %(stderr)r") % {'cmd': cmd, 'code': obj.returncode,
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
- return return_stderr and (_stdout, _stderr) or _stdout
+ return (_stdout, _stderr) if return_stderr else _stdout
def get_interface_mac(interface):
try:
with open(file_name, 'r') as f:
try:
- return converter and converter(f.read()) or f.read()
+ return converter(f.read()) if converter else f.read()
except ValueError:
msg = _('Unable to convert value in %s')
except IOError:
def _agent_notification(self, context, method, router_ids, operation,
shuffle_agents):
"""Notify changed routers to hosting l3 agents."""
- adminContext = context.is_admin and context or context.elevated()
+ adminContext = context if context.is_admin else context.elevated()
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
for router_id in router_ids:
def _agent_notification(self, context, method, routers):
"""Notify l3 metering agents hosted by l3 agent hosts."""
- adminContext = context.is_admin and context or context.elevated()
+ adminContext = context if context.is_admin else context.elevated()
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
else:
bind_port.host = host
else:
- host = (bind_port and bind_port.host or None)
+ host = bind_port.host if bind_port else None
self._extend_port_dict_binding_host(port, host)
def get_port_host(self, context, port_id):
with context.session.begin(subtransactions=True):
bind_port = context.session.query(
PortBindingPort).filter_by(port_id=port_id).first()
- return bind_port and bind_port.host or None
+ return bind_port.host if bind_port else None
def _extend_port_dict_binding_host(self, port_res, host):
super(PortBindingMixin, self).extend_port_dict_binding(
port_res[portbindings.HOST_ID] = host
def extend_port_dict_binding(self, port_res, port_db):
- host = (port_db.portbinding and port_db.portbinding.host or None)
+ host = port_db.portbinding.host if port_db.portbinding else None
self._extend_port_dict_binding_host(port_res, host)
debug_agent = self.get_debug_agent()
info = debug_agent.list_probes()
- columns = len(info) > 0 and sorted(info[0].keys()) or []
+ columns = sorted(info[0].keys()) if info else []
return (columns, (utils.get_item_properties(
s, columns, formatters=self._formatters, )
for s in info), )
def get_next_vlan(self, vlan_id=None):
"""Try to get a specific vlan if requested or get the next vlan."""
min_vlan_search = vlan_id or MIN_VLAN
- max_vlan_search = (vlan_id and vlan_id + 1) or MAX_VLAN
+ max_vlan_search = (vlan_id + 1) if vlan_id else MAX_VLAN
for vlan in moves.xrange(min_vlan_search, max_vlan_search):
if vlan not in self.vlans:
def _agent_notification(self, context, method, routers, operation, data):
"""Notify individual Cisco cfg agents."""
- admin_context = context.is_admin and context or context.elevated()
+ admin_context = context if context.is_admin else context.elevated()
for router in routers:
if router['hosting_device'] is None:
continue
net = (session.query(AristaProvisionedNets).
filter_by(tenant_id=tenant_id,
network_id=network_id).first())
- return net and net.segmentation_id or None
+ return net.segmentation_id if net else None
def is_vm_provisioned(vm_id, host_id, port_id,
try:
return query.filter_by(net_id=network_id).one()
except (orm.exc.NoResultFound, d_exc.DBError):
- logger = raise_on_err and LOG.error or LOG.warn
- logger(_('Unable to find Logical Service Node for '
- 'network %s'), network_id)
+ msg = _('Unable to find Logical Service Node for network %s')
if raise_on_err:
+ LOG.error(msg, network_id)
raise p_exc.LsnNotFound(entity='network',
entity_id=network_id)
+ else:
+ LOG.warn(msg, network_id)
def lsn_port_add_for_lsn(context, lsn_port_id, subnet_id, mac, lsn_id):
'devices': device_list,
'tenant_id': network_gateway['tenant_id']}
# Query gateway connections only if needed
- if (fields and 'ports' in fields) or not fields:
+ if not fields or 'ports' in fields:
res['ports'] = [self._make_gw_connection_dict(conn)
for conn in network_gateway.network_connections]
return self._fields(res, fields)
query = self._get_collection_query(context,
NetworkConnection,
filters)
- return only_one and query.one() or query.all()
+ return query.one() if only_one else query.all()
def _unset_default_network_gateways(self, context):
with context.session.begin(subtransactions=True):
try:
return lsn_api.lsn_for_network_get(self.cluster, network_id)
except (n_exc.NotFound, api_exc.NsxApiException):
- logger = raise_on_err and LOG.error or LOG.warn
- logger(_('Unable to find Logical Service Node for '
- 'network %s'), network_id)
+ msg = _('Unable to find Logical Service Node for network %s')
if raise_on_err:
+ LOG.error(msg, network_id)
raise p_exc.LsnNotFound(entity='network',
entity_id=network_id)
+ else:
+ LOG.warn(msg, network_id)
def lsn_create(self, context, network_id):
"""Create a LSN associated to the network."""
lsn_port_id = lsn_api.lsn_port_by_subnet_get(
self.cluster, lsn_id, subnet_id)
except (n_exc.NotFound, api_exc.NsxApiException):
- logger = raise_on_err and LOG.error or LOG.warn
- logger(_('Unable to find Logical Service Node Port for '
- 'LSN %(lsn_id)s and subnet %(subnet_id)s')
- % {'lsn_id': lsn_id, 'subnet_id': subnet_id})
+ msg = _('Unable to find Logical Service Node Port for '
+ 'LSN %(lsn_id)s and subnet %(subnet_id)s')
if raise_on_err:
+ LOG.error(msg, {'lsn_id': lsn_id, 'subnet_id': subnet_id})
raise p_exc.LsnPortNotFound(lsn_id=lsn_id,
entity='subnet',
entity_id=subnet_id)
+ else:
+ LOG.warn(msg, {'lsn_id': lsn_id, 'subnet_id': subnet_id})
return (lsn_id, None)
else:
return (lsn_id, lsn_port_id)
lsn_port_id = lsn_api.lsn_port_by_mac_get(
self.cluster, lsn_id, mac)
except (n_exc.NotFound, api_exc.NsxApiException):
- logger = raise_on_err and LOG.error or LOG.warn
- logger(_('Unable to find Logical Service Node Port for '
- 'LSN %(lsn_id)s and mac address %(mac)s')
- % {'lsn_id': lsn_id, 'mac': mac})
+ msg = _('Unable to find Logical Service Node Port for '
+ 'LSN %(lsn_id)s and mac address %(mac)s')
if raise_on_err:
+ LOG.error(msg, {'lsn_id': lsn_id, 'mac': mac})
raise p_exc.LsnPortNotFound(lsn_id=lsn_id,
entity='MAC',
entity_id=mac)
+ else:
+ LOG.warn(msg, {'lsn_id': lsn_id, 'mac': mac})
return (lsn_id, None)
else:
return (lsn_id, lsn_port_id)
is_attachment=False,
extra_action=None):
resources = resource.split('/')
- res_path = resources[0] + (resource_id and "/%s" % resource_id or '')
+ res_path = resources[0]
+ if resource_id:
+ res_path += "/%s" % resource_id
if len(resources) > 1:
# There is also a parent resource to account for in the uri
res_path = "%s/%s/%s" % (resources[1],
is_attachment=False):
uri_prefix = "%s/%s/%s" % (URI_PREFIX, edge_id, service)
if resource:
- res_path = resource + (resource_id and "/%s" % resource_id or '')
+ res_path = resource
+ if resource_id:
+ res_path += "/%s" % resource_id
uri_path = "%s/%s" % (uri_prefix, res_path)
else:
uri_path = uri_prefix
target = credentials
# Backward compatibility: if ADMIN_CTX_POLICY is not
# found, default to validating role:admin
- admin_policy = (ADMIN_CTX_POLICY in policy._rules
- and ADMIN_CTX_POLICY or 'role:admin')
+ admin_policy = (ADMIN_CTX_POLICY if ADMIN_CTX_POLICY in policy._rules
+ else 'role:admin')
return policy.check(admin_policy, target, credentials)
self._add_rules_to_chain(ipt_mgr, IPV6, 'FORWARD', jump_rule)
def _convert_fwaas_to_iptables_rule(self, rule):
- action = rule.get('action') == 'allow' and 'ACCEPT' or 'DROP'
+ action = 'ACCEPT' if rule.get('action') == 'allow' else 'DROP'
args = [self._protocol_arg(rule.get('protocol')),
self._port_arg('dport',
rule.get('protocol'),
This method will find where is the router, and
dispatch notification for the agent.
"""
- admin_context = context.is_admin and context or context.elevated()
+ admin_context = context if context.is_admin else context.elevated()
if not version:
version = self.RPC_API_VERSION
l3_agents = self.driver.l3_plugin.get_l3_agents_hosting_routers(
Find the host for the router being notified and then
dispatches a notification for the VPN device driver.
"""
- admin_context = context.is_admin and context or context.elevated()
+ admin_context = context if context.is_admin else context.elevated()
if not version:
version = self.RPC_API_VERSION
host = via_cfg_file.get_host_for_router(admin_context, router_id)
def reset_reponses(self, req=None):
# Clear all staged responses.
- reqs = req and [req] or ['post', 'get'] # Both if none specified.
+ reqs = [req] if req else ['post', 'get'] # Both if none specified.
for req in reqs:
del self.response[req][:]
self.restart_responses(req)
def _stage_mocked_response(self, req, mock_status, mo, **attrs):
response = mock.MagicMock()
response.status_code = mock_status
- mo_attrs = attrs and [{mo: {'attributes': attrs}}] or []
+ mo_attrs = [{mo: {'attributes': attrs}}] if attrs else []
response.json.return_value = {'imdata': mo_attrs}
self.response[req].append(response)
class TestNetworksV2(test_plugin.TestNetworksV2, NsxPluginV2TestCase):
def _test_create_bridge_network(self, vlan_id=0):
- net_type = vlan_id and 'vlan' or 'flat'
+ net_type = 'vlan' if vlan_id else 'flat'
name = 'bridge_net'
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),