LOG = logging.getLogger(__name__)
-
-DEVICE_OWNER_DVR_INTERFACE = l3_const.DEVICE_OWNER_DVR_INTERFACE
-DEVICE_OWNER_DVR_SNAT = l3_const.DEVICE_OWNER_ROUTER_SNAT
-FLOATINGIP_AGENT_INTF_KEY = l3_const.FLOATINGIP_AGENT_INTF_KEY
-DEVICE_OWNER_AGENT_GW = l3_const.DEVICE_OWNER_AGENT_GW
-SNAT_ROUTER_INTF_KEY = l3_const.SNAT_ROUTER_INTF_KEY
-
-
router_distributed_opts = [
cfg.BoolOpt('router_distributed',
default=False,
router_device_owners = (
l3_db.L3_NAT_db_mixin.router_device_owners +
- (DEVICE_OWNER_DVR_INTERFACE,
- DEVICE_OWNER_DVR_SNAT,
- DEVICE_OWNER_AGENT_GW))
+ (l3_const.DEVICE_OWNER_DVR_INTERFACE,
+ l3_const.DEVICE_OWNER_ROUTER_SNAT,
+ l3_const.DEVICE_OWNER_AGENT_GW))
extra_attributes = (
l3_attrs_db.ExtraAttributesMixin.extra_attributes + [{
"""Update the model to support the dvr case of a router."""
if data.get('distributed'):
old_owner = l3_const.DEVICE_OWNER_ROUTER_INTF
- new_owner = DEVICE_OWNER_DVR_INTERFACE
+ new_owner = l3_const.DEVICE_OWNER_DVR_INTERFACE
for rp in router_db.attached_ports.filter_by(port_type=old_owner):
rp.port_type = new_owner
rp.port.device_owner = new_owner
if router_is_uuid:
router = self._get_router(context, router)
if is_distributed_router(router):
- return DEVICE_OWNER_DVR_INTERFACE
+ return l3_const.DEVICE_OWNER_DVR_INTERFACE
return super(L3_NAT_with_dvr_db_mixin,
self)._get_device_owner(context, router)
def _port_has_ipv6_address(self, port):
"""Overridden to return False if DVR SNAT port."""
- if port['device_owner'] == DEVICE_OWNER_DVR_SNAT:
+ if port['device_owner'] == l3_const.DEVICE_OWNER_ROUTER_SNAT:
return False
return super(L3_NAT_with_dvr_db_mixin,
self)._port_has_ipv6_address(port)
qry = context.session.query(l3_db.RouterPort)
qry = qry.filter(
l3_db.RouterPort.router_id.in_(router_ids),
- l3_db.RouterPort.port_type == DEVICE_OWNER_DVR_SNAT
+ l3_db.RouterPort.port_type == l3_const.DEVICE_OWNER_ROUTER_SNAT
)
interfaces = collections.defaultdict(list)
for rp in qry:
if router['gw_port_id']:
snat_router_intfs = snat_intfs_by_router_id[router['id']]
LOG.debug("SNAT ports returned: %s ", snat_router_intfs)
- router[SNAT_ROUTER_INTF_KEY] = snat_router_intfs
+ router[l3_const.SNAT_ROUTER_INTF_KEY] = snat_router_intfs
return routers_dict
def _process_floating_ips_dvr(self, context, routers_dict,
if not fip_agent_id:
return []
filters = {'device_id': [fip_agent_id],
- 'device_owner': [DEVICE_OWNER_AGENT_GW]}
+ 'device_owner': [l3_const.DEVICE_OWNER_AGENT_GW]}
interfaces = self._core_plugin.get_ports(context.elevated(), filters)
LOG.debug("Return the FIP ports: %s ", interfaces)
return interfaces
ports_to_populate.append(router['gw_port'])
if router.get(l3_const.FLOATINGIP_AGENT_INTF_KEY):
ports_to_populate += router[l3_const.FLOATINGIP_AGENT_INTF_KEY]
- if router.get(SNAT_ROUTER_INTF_KEY):
- ports_to_populate += router[SNAT_ROUTER_INTF_KEY]
+ if router.get(l3_const.SNAT_ROUTER_INTF_KEY):
+ ports_to_populate += router[l3_const.SNAT_ROUTER_INTF_KEY]
ports_to_populate += interfaces
self._populate_subnets_for_ports(context, ports_to_populate)
self._process_interfaces(routers_dict, interfaces)
vm_port_db = port or self._core_plugin.get_port(context, port_id)
device_owner = vm_port_db['device_owner'] if vm_port_db else ""
if (n_utils.is_dvr_serviced(device_owner) or
- device_owner == DEVICE_OWNER_AGENT_GW):
+ device_owner == l3_const.DEVICE_OWNER_AGENT_GW):
return vm_port_db[portbindings.HOST_ID]
def _get_agent_gw_ports_exist_for_network(
filters = {
'network_id': [network_id],
'device_id': [agent_id],
- 'device_owner': [DEVICE_OWNER_AGENT_GW]
+ 'device_owner': [l3_const.DEVICE_OWNER_AGENT_GW]
}
ports = self._core_plugin.get_ports(context, filters)
if ports:
self, context, host_id, ext_net_id):
"""Function to delete FIP gateway port with given ext_net_id."""
# delete any fip agent gw port
- device_filter = {'device_owner': [DEVICE_OWNER_AGENT_GW],
+ device_filter = {'device_owner': [l3_const.DEVICE_OWNER_AGENT_GW],
'network_id': [ext_net_id]}
ports = self._core_plugin.get_ports(context,
filters=device_filter)
port_data = {'tenant_id': '',
'network_id': network_id,
'device_id': l3_agent_db['id'],
- 'device_owner': DEVICE_OWNER_AGENT_GW,
+ 'device_owner': l3_const.DEVICE_OWNER_AGENT_GW,
'binding:host_id': host,
'admin_state_up': True,
'name': ''}
qry = context.session.query(l3_db.RouterPort)
qry = qry.filter_by(
router_id=router_id,
- port_type=DEVICE_OWNER_DVR_SNAT
+ port_type=l3_const.DEVICE_OWNER_ROUTER_SNAT
)
ports = [self._core_plugin._make_port_dict(rp.port, None)
'network_id': network_id,
'fixed_ips': [{'subnet_id': subnet_id}],
'device_id': router.id,
- 'device_owner': DEVICE_OWNER_DVR_SNAT,
+ 'device_owner': l3_const.DEVICE_OWNER_ROUTER_SNAT,
'admin_state_up': True,
'name': ''}
snat_port = p_utils.create_port(self._core_plugin, context,
router_port = l3_db.RouterPort(
port_id=snat_port['id'],
router_id=router.id,
- port_type=DEVICE_OWNER_DVR_SNAT
+ port_type=l3_const.DEVICE_OWNER_ROUTER_SNAT
)
context.session.add(router_port)
int_ports = (
rp.port for rp in
router.attached_ports.filter_by(
- port_type=DEVICE_OWNER_DVR_INTERFACE
+ port_type=l3_const.DEVICE_OWNER_DVR_INTERFACE
)
)
LOG.info(_LI('SNAT interface port list does not exist,'
filters = {'fixed_ips': {'subnet_id': [subnet]}}
ports = self._core_plugin.get_ports(context, filters=filters)
for port in ports:
- if port['device_owner'] == DEVICE_OWNER_DVR_INTERFACE:
+ if port['device_owner'] == l3_const.DEVICE_OWNER_DVR_INTERFACE:
router_id = port['device_id']
router_dict = self._get_router(context, router_id)
if router_dict.extra_attributes.distributed:
# changeset size since it is late in cycle
ports = (
rp.port.id for rp in
- router.attached_ports.filter_by(port_type=DEVICE_OWNER_DVR_SNAT)
+ router.attached_ports.filter_by(
+ port_type=l3_const.DEVICE_OWNER_ROUTER_SNAT)
if rp.port
)