super(DvrEdgeRouter, self).external_gateway_added(
ex_gw_port, interface_name)
if self._is_this_snat_host():
- snat_ports = self.get_snat_interfaces()
- self._create_dvr_gateway(ex_gw_port, interface_name, snat_ports)
+ self._create_dvr_gateway(ex_gw_port, interface_name)
def external_gateway_updated(self, ex_gw_port, interface_name):
if not self._is_this_snat_host():
if not self._is_this_snat_host():
return
- snat_ports = self.get_snat_interfaces()
- sn_port = self._map_internal_interfaces(port, snat_ports)
+ sn_port = self.get_snat_port_for_internal_port(port)
if not sn_port:
return
if not self.ex_gw_port:
return
- snat_ports = self.get_snat_interfaces()
- sn_port = self._map_internal_interfaces(port, snat_ports)
+ sn_port = self.get_snat_port_for_internal_port(port)
if not sn_port:
return
self.driver.unplug(snat_interface, namespace=ns_name,
prefix=prefix)
- def _create_dvr_gateway(self, ex_gw_port, gw_interface_name,
- snat_ports):
+ def _create_dvr_gateway(self, ex_gw_port, gw_interface_name):
"""Create SNAT namespace."""
snat_ns = self.create_snat_namespace()
# connect snat_ports to br_int from SNAT namespace
- for port in snat_ports:
+ for port in self.get_snat_interfaces():
# create interface_name
interface_name = self.get_snat_int_device_name(port['id'])
self._internal_network_added(
from oslo_utils import excutils
from neutron.agent.l3 import dvr_fip_ns
-from neutron.agent.l3 import router_info as router
+from neutron.agent.l3 import dvr_router_base
from neutron.agent.linux import ip_lib
from neutron.common import constants as l3_constants
from neutron.common import exceptions
MASK_30 = 0x3fffffff
-class DvrLocalRouter(router.RouterInfo):
+class DvrLocalRouter(dvr_router_base.DvrRouterBase):
def __init__(self, agent, host, *args, **kwargs):
- super(DvrLocalRouter, self).__init__(*args, **kwargs)
-
- self.agent = agent
- self.host = host
+ super(DvrLocalRouter, self).__init__(agent, host, *args, **kwargs)
self.floating_ips_dict = {}
# Linklocal subnet for router and floating IP namespace link
floating_ips = super(DvrLocalRouter, self).get_floating_ips()
return [i for i in floating_ips if i['host'] == self.host]
- def get_snat_interfaces(self):
- return self.router.get(l3_constants.SNAT_ROUTER_INTF_KEY, [])
-
def _handle_fip_nat_rules(self, interface_name, action):
"""Configures NAT rules for Floating IPs for DVR.
subnet_id,
'add')
- def _map_internal_interfaces(self, int_port, snat_ports):
- """Return the SNAT port for the given internal interface port."""
- fixed_ip = int_port['fixed_ips'][0]
- subnet_id = fixed_ip['subnet_id']
- match_port = [p for p in snat_ports if
- p['fixed_ips'][0]['subnet_id'] == subnet_id]
- if match_port:
- return match_port[0]
- else:
- LOG.error(_LE('DVR: no map match_port found!'))
-
@staticmethod
def _get_snat_idx(ip_cidr):
"""Generate index for DVR snat rules and route tables.
if not ex_gw_port:
return
- snat_ports = self.get_snat_interfaces()
- sn_port = self._map_internal_interfaces(port, snat_ports)
+ sn_port = self.get_snat_port_for_internal_port(port)
if not sn_port:
return
if not self.ex_gw_port:
return
- snat_ports = self.get_snat_interfaces()
- sn_port = self._map_internal_interfaces(port, snat_ports)
+ sn_port = self.get_snat_port_for_internal_port(port)
if not sn_port:
return
ip_wrapr = ip_lib.IPWrapper(namespace=self.ns_name)
ip_wrapr.netns.execute(['sysctl', '-w',
'net.ipv4.conf.all.send_redirects=0'])
- snat_ports = self.get_snat_interfaces()
for p in self.internal_ports:
- gateway = self._map_internal_interfaces(p, snat_ports)
+ gateway = self.get_snat_port_for_internal_port(p)
id_name = self.get_internal_device_name(p['id'])
if gateway:
self._snat_redirect_add(gateway, p, id_name)
- for port in snat_ports:
+ for port in self.get_snat_interfaces():
for ip in port['fixed_ips']:
self._update_arp_entry(ip['ip_address'],
port['mac_address'],
to_fip_interface_name = (
self.get_external_device_interface_name(ex_gw_port))
self.process_floating_ip_addresses(to_fip_interface_name)
- snat_ports = self.get_snat_interfaces()
for p in self.internal_ports:
- gateway = self._map_internal_interfaces(p, snat_ports)
+ gateway = self.get_snat_port_for_internal_port(p)
internal_interface = self.get_internal_device_name(p['id'])
self._snat_redirect_remove(gateway, p, internal_interface)
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_log import log as logging
+
+from neutron.agent.l3 import router_info as router
+from neutron.common import constants as l3_constants
+from neutron.i18n import _LE
+
+LOG = logging.getLogger(__name__)
+
+
+class DvrRouterBase(router.RouterInfo):
+ def __init__(self, agent, host, *args, **kwargs):
+ super(DvrRouterBase, self).__init__(*args, **kwargs)
+
+ self.agent = agent
+ self.host = host
+
+ def get_snat_interfaces(self):
+ return self.router.get(l3_constants.SNAT_ROUTER_INTF_KEY, [])
+
+ def get_snat_port_for_internal_port(self, int_port):
+ """Return the SNAT port for the given internal interface port."""
+ snat_ports = self.get_snat_interfaces()
+ fixed_ip = int_port['fixed_ips'][0]
+ subnet_id = fixed_ip['subnet_id']
+ match_port = [p for p in snat_ports
+ if p['fixed_ips'][0]['subnet_id'] == subnet_id]
+ if match_port:
+ return match_port[0]
+ else:
+ LOG.error(_LE('DVR: no map match_port found!'))
if action == 'add':
self.device_exists.return_value = False
- ri._map_internal_interfaces = mock.Mock(return_value=sn_port)
+ ri.get_snat_port_for_internal_port = mock.Mock(
+ return_value=sn_port)
ri._snat_redirect_add = mock.Mock()
ri._set_subnet_arp_info = mock.Mock()
ri._internal_network_added = mock.Mock()
dvr_snat_ns.SNAT_INT_DEV_PREFIX)
elif action == 'remove':
self.device_exists.return_value = False
- ri._map_internal_interfaces = mock.Mock(return_value=sn_port)
+ ri.get_snat_port_for_internal_port = mock.Mock(
+ return_value=sn_port)
ri._snat_redirect_modify = mock.Mock()
ri.internal_network_removed(port)
ri._snat_redirect_modify.assert_called_with(
interface_name, ip_cidrs, **kwargs)
else:
ri._create_dvr_gateway.assert_called_once_with(
- ex_gw_port, interface_name,
- self.snat_ports)
+ ex_gw_port, interface_name)
def _test_external_gateway_action(self, action, router, dual_stack=False):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
elif action == 'remove':
self.device_exists.return_value = True
- ri._map_internal_interfaces = mock.Mock(return_value=sn_port)
+ ri.get_snat_port_for_internal_port = mock.Mock(
+ return_value=sn_port)
ri._snat_redirect_remove = mock.Mock()
ri.external_gateway_removed(ex_gw_port, interface_name)
if not router.get('distributed'):
else:
self.assertIn(r.rule, expected_rules)
- def test__map_internal_interfaces(self):
+ def test_get_snat_port_for_internal_port(self):
router = l3_test_common.prepare_router_data(num_internal_ports=4)
ri = dvr_router.DvrEdgeRouter(mock.sentinel.agent,
HOSTNAME,
'ip_address': '101.12.13.14'}]}
internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
# test valid case
- res_port = ri._map_internal_interfaces(internal_ports[0], [test_port])
- self.assertEqual(test_port, res_port)
- # test invalid case
- test_port['fixed_ips'][0]['subnet_id'] = 1234
- res_ip = ri._map_internal_interfaces(internal_ports[0], [test_port])
- self.assertNotEqual(test_port, res_ip)
- self.assertIsNone(res_ip)
+ with mock.patch.object(ri, 'get_snat_interfaces') as get_interfaces:
+ get_interfaces.return_value = [test_port]
+ res_port = ri.get_snat_port_for_internal_port(internal_ports[0])
+ self.assertEqual(test_port, res_port)
+ # test invalid case
+ test_port['fixed_ips'][0]['subnet_id'] = 1234
+ res_ip = ri.get_snat_port_for_internal_port(internal_ports[0])
+ self.assertNotEqual(test_port, res_ip)
+ self.assertIsNone(res_ip)
def test_process_cent_router(self):
router = l3_test_common.prepare_router_data()
interface_name = ri.get_snat_int_device_name(port_id)
self.device_exists.return_value = False
- ri._create_dvr_gateway(dvr_gw_port, interface_name, self.snat_ports)
+ with mock.patch.object(ri, 'get_snat_interfaces') as get_interfaces:
+ get_interfaces.return_value = self.snat_ports
+ ri._create_dvr_gateway(dvr_gw_port, interface_name)
# check 2 internal ports are plugged
# check 1 ext-gw-port is plugged