p = port['port']
ips = []
- fixed_configured = (p['fixed_ips'] != attributes.ATTR_NOT_SPECIFIED)
+ fixed_configured = p['fixed_ips'] is not attributes.ATTR_NOT_SPECIFIED
if fixed_configured:
configured_ips = self._test_fixed_ips_for_port(context,
p["network_id"],
"""
pools = []
- if subnet['allocation_pools'] == attributes.ATTR_NOT_SPECIFIED:
+ if subnet['allocation_pools'] is attributes.ATTR_NOT_SPECIFIED:
# Auto allocate the pool around gateway_ip
net = netaddr.IPNetwork(subnet['cidr'])
first_ip = net.first + 1
if 'cidr' in s:
self._validate_ip_version(ip_ver, s['cidr'], 'cidr')
- if ('gateway_ip' in s and
- s['gateway_ip'] and
- s['gateway_ip'] != attributes.ATTR_NOT_SPECIFIED):
+ if attributes.is_attr_set(s.get('gateway_ip')):
self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
if (cfg.CONF.force_gateway_on_subnet and
not QuantumDbPluginV2._check_subnet_ip(s['cidr'],
error_message = _("Gateway is not valid on subnet")
raise q_exc.InvalidInput(error_message=error_message)
- if ('dns_nameservers' in s and
- s['dns_nameservers'] != attributes.ATTR_NOT_SPECIFIED):
+ if attributes.is_attr_set(s.get('dns_nameservers')):
if len(s['dns_nameservers']) > cfg.CONF.max_dns_nameservers:
raise q_exc.DNSNameServersExhausted(
subnet_id=id,
dns))
self._validate_ip_version(ip_ver, dns, 'dns_nameserver')
- if ('host_routes' in s and
- s['host_routes'] != attributes.ATTR_NOT_SPECIFIED):
+ if attributes.is_attr_set(s.get('host_routes')):
if len(s['host_routes']) > cfg.CONF.max_subnet_host_routes:
raise q_exc.HostRoutesExhausted(
subnet_id=id,
self._validate_subnet(s)
net = netaddr.IPNetwork(s['cidr'])
- if s['gateway_ip'] == attributes.ATTR_NOT_SPECIFIED:
+ if s['gateway_ip'] is attributes.ATTR_NOT_SPECIFIED:
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
tenant_id = self._get_tenant_id_for_create(context, s)
pools = self._allocate_pools_for_subnet(context, s)
context.session.add(subnet)
- if s['dns_nameservers'] != attributes.ATTR_NOT_SPECIFIED:
+ if s['dns_nameservers'] is not attributes.ATTR_NOT_SPECIFIED:
for addr in s['dns_nameservers']:
ns = models_v2.DNSNameServer(address=addr,
subnet_id=subnet.id)
context.session.add(ns)
- if s['host_routes'] != attributes.ATTR_NOT_SPECIFIED:
+ if s['host_routes'] is not attributes.ATTR_NOT_SPECIFIED:
for rt in s['host_routes']:
route = models_v2.Route(subnet_id=subnet.id,
destination=rt['destination'],
# Ensure that a MAC address is defined and it is unique on the
# network
- if mac_address == attributes.ATTR_NOT_SPECIFIED:
+ if mac_address is attributes.ATTR_NOT_SPECIFIED:
mac_address = QuantumDbPluginV2._generate_mac(context,
network_id)
else:
# validate network ownership
super(NECPluginV2Base, self).get_network(context, pf['network_id'])
- if pf.get('in_port') != attributes.ATTR_NOT_SPECIFIED:
+ if pf.get('in_port') is not attributes.ATTR_NOT_SPECIFIED:
# validate port ownership
super(NECPluginV2Base, self).get_port(context, pf['in_port'])
'dst_port': 0,
'protocol': ''}
for key, default in conditions.items():
- if pf.get(key) == attributes.ATTR_NOT_SPECIFIED:
+ if pf.get(key) is attributes.ATTR_NOT_SPECIFIED:
params.update({key: default})
else:
params.update({key: pf.get(key)})
def _is_attribute_explicitly_set(attribute_name, resource, target):
"""Verify that an attribute is present and has a non-default value"""
- if ('default' in resource[attribute_name] and
- target.get(attribute_name, attributes.ATTR_NOT_SPECIFIED) !=
- attributes.ATTR_NOT_SPECIFIED):
- if (target[attribute_name] != resource[attribute_name]['default']):
- return True
- return False
+ return ('default' in resource[attribute_name] and
+ attribute_name in target and
+ target[attribute_name] is not attributes.ATTR_NOT_SPECIFIED and
+ target[attribute_name] != resource[attribute_name]['default'])
def _build_target(action, original_target, plugin, context):