# License for the specific language governing permissions and limitations
# under the License.
+import binascii
import netaddr
import os
FIP_PR_END = FIP_PR_START + 40000
# Route Table index for FIPs
FIP_RT_TBL = 16
+# xor-folding mask used for IPv6 rule index
+MASK_30 = 0x3fffffff
class RouterMixin(object):
router['id'])
return host
+ def _get_snat_idx(self, ip_cidr):
+ """Generate index for DVR snat rules and route tables.
+
+ The index value has to be 32 bits or less but more than the system
+ generated entries i.e. 32768. For IPv4 use the numeric value of the
+ cidr. For IPv6 generate a crc32 bit hash and xor-fold to 30 bits.
+ Use the freed range to extend smaller values so that they become
+ greater than system generated entries.
+ """
+ net = netaddr.IPNetwork(ip_cidr)
+ if net.version == 6:
+ # the crc32 & 0xffffffff is for Python 2.6 and 3.0 compatibility
+ snat_idx = binascii.crc32(ip_cidr) & 0xffffffff
+ # xor-fold the hash to reserve upper range to extend smaller values
+ snat_idx = (snat_idx >> 30) ^ (snat_idx & MASK_30)
+ if snat_idx < 32768:
+ snat_idx = snat_idx + MASK_30
+ else:
+ snat_idx = net.value
+ return snat_idx
+
def _map_internal_interfaces(self, ri, int_port, snat_ports):
"""Return the SNAT port for the given internal interface port."""
fixed_ip = int_port['fixed_ips'][0]
def _snat_redirect_add(self, ri, gateway, sn_port, sn_int):
"""Adds rules and routes for SNAT redirection."""
try:
- snat_idx = netaddr.IPNetwork(sn_port['ip_cidr']).value
+ snat_idx = self._get_snat_idx(sn_port['ip_cidr'])
ns_ipr = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
ns_ipd = ip_lib.IPDevice(sn_int, self.root_helper,
namespace=ri.ns_name)
def _snat_redirect_remove(self, ri, sn_port, sn_int):
"""Removes rules and routes for SNAT redirection."""
try:
- snat_idx = netaddr.IPNetwork(sn_port['ip_cidr']).value
+ snat_idx = self._get_snat_idx(sn_port['ip_cidr'])
ns_ipr = ip_lib.IpRule(self.root_helper, namespace=ri.ns_name)
ns_ipd = ip_lib.IPDevice(sn_int, self.root_helper,
namespace=ri.ns_name)
else:
self.assertIn(r.rule, expected_rules)
+ def test__get_snat_idx_ipv4(self):
+ ip_cidr = '101.12.13.00/24'
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ snat_idx = agent._get_snat_idx(ip_cidr)
+ # 0x650C0D00 is numerical value of 101.12.13.00
+ self.assertEqual(0x650C0D00, snat_idx)
+
+ def test__get_snat_idx_ipv6(self):
+ ip_cidr = '2620:0:a03:e100::/64'
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ snat_idx = agent._get_snat_idx(ip_cidr)
+ # 0x3D345705 is 30 bit xor folded crc32 of the ip_cidr
+ self.assertEqual(0x3D345705, snat_idx)
+
+ def test__get_snat_idx_ipv6_below_32768(self):
+ ip_cidr = 'd488::/30'
+ # crc32 of this ip_cidr is 0x1BD7
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ snat_idx = agent._get_snat_idx(ip_cidr)
+ # 0x1BD7 + 0x3FFFFFFF = 0x40001BD6
+ self.assertEqual(0x40001BD6, snat_idx)
+
def test__map_internal_interfaces(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = prepare_router_data(num_internal_ports=4)