INTERNAL_DEV_PREFIX = 'qr-'
EXTERNAL_DEV_PREFIX = 'qg-'
RPC_LOOP_INTERVAL = 1
-FLOATING_IP_CIDR_SUFFIX = '/32'
class L3PluginApi(object):
def _add_floating_ip(self, ri, fip, interface_name, device):
fip_ip = fip['floating_ip_address']
- ip_cidr = str(fip_ip) + FLOATING_IP_CIDR_SUFFIX
+ ip_cidr = common_utils.ip_to_cidr(fip_ip)
if ri.is_ha:
self._add_vip(ri, ip_cidr, interface_name)
# Loop once to ensure that floating ips are configured.
for fip in floating_ips:
fip_ip = fip['floating_ip_address']
- ip_cidr = str(fip_ip) + FLOATING_IP_CIDR_SUFFIX
+ ip_cidr = common_utils.ip_to_cidr(fip_ip)
new_cidrs.add(ip_cidr)
fip_statuses[fip['id']] = l3_constants.FLOATINGIP_STATUS_ACTIVE
if ip_cidr not in existing_cidrs:
ri, fip, interface_name, device)
fips_to_remove = (
- ip_cidr for ip_cidr in existing_cidrs - new_cidrs if
- ip_cidr.endswith(FLOATING_IP_CIDR_SUFFIX))
+ ip_cidr for ip_cidr in existing_cidrs - new_cidrs
+ if common_utils.is_cidr_host(ip_cidr))
for ip_cidr in fips_to_remove:
self._remove_floating_ip(ri, device, ip_cidr)
# This avoids unnecessarily removing those addresses and
# causing a momentarily network outage.
floating_ips = self.get_floating_ips(ri)
- preserve_ips = [ip['floating_ip_address'] + FLOATING_IP_CIDR_SUFFIX
+ preserve_ips = [common_utils.ip_to_cidr(ip['floating_ip_address'])
for ip in floating_ips]
self._external_gateway_added(ri, ex_gw_port, interface_name,
else:
ns_name = ri.ns_name
floating_ips = self.get_floating_ips(ri)
- preserve_ips = [ip['floating_ip_address'] + FLOATING_IP_CIDR_SUFFIX
+ preserve_ips = [common_utils.ip_to_cidr(ip['floating_ip_address'])
for ip in floating_ips]
self._external_gateway_added(ri, ex_gw_port, interface_name,
IPv4 = 'IPv4'
IPv6 = 'IPv6'
+IPv4_BITS = 32
+IPv6_BITS = 128
DHCP_RESPONSE_PORT = 68
import hashlib
import logging as std_logging
import multiprocessing
+import netaddr
import os
import random
import signal
'port': conf.auth_port})
# NOTE(ihrachys): all existing consumers assume version 2.0
return '%s/v2.0/' % auth_uri
+
+
+def ip_to_cidr(ip, prefix=None):
+ """Convert an ip with no prefix to cidr notation
+
+ :param ip: An ipv4 or ipv6 address. Convertable to netaddr.IPNetwork.
+ :param prefix: Optional prefix. If None, the default 32 will be used for
+ ipv4 and 128 for ipv6.
+ """
+ net = netaddr.IPNetwork(ip)
+ if prefix is not None:
+ # Can't pass ip and prefix separately. Must concatenate strings.
+ net = netaddr.IPNetwork(str(net.ip) + '/' + str(prefix))
+ return str(net)
+
+
+def is_cidr_host(cidr):
+ """Determines if the cidr passed in represents a single host network
+
+ :param cidr: Either an ipv4 or ipv6 cidr.
+ :returns: True if the cidr is /32 for ipv4 or /128 for ipv6.
+ :raises ValueError: raises if cidr does not contain a '/'. This disallows
+ plain IP addresses specifically to avoid ambiguity.
+ """
+ if '/' not in str(cidr):
+ raise ValueError("cidr doesn't contain a '/'")
+ net = netaddr.IPNetwork(cidr)
+ if net.version == 4:
+ return net.prefixlen == q_const.IPv4_BITS
+ return net.prefixlen == q_const.IPv6_BITS
import webob.exc
from neutron.agent.common import config as agent_config
-from neutron.agent.l3 import agent as l3_agent
from neutron.agent import l3_agent as l3_agent_main
from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
from neutron.agent.metadata import agent as metadata_agent
from neutron.common import config as common_config
from neutron.common import constants as l3_constants
+from neutron.common import utils as common_utils
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron.services import advanced_service as adv_svc
internal_device_name = self.agent.get_internal_device_name(
internal_port['id'])
internal_device_cidr = internal_port['ip_cidr']
- floating_ip_cidr = (
- self.agent.get_floating_ips(router)[0]
- ['floating_ip_address'] + l3_agent.FLOATING_IP_CIDR_SUFFIX)
+ floating_ip_cidr = common_utils.ip_to_cidr(
+ self.agent.get_floating_ips(router)[0]['floating_ip_address'])
default_gateway_ip = external_port['subnet'].get('gateway_ip')
return """vrrp_sync_group VG_1 {
import eventlet
import mock
+import netaddr
import testtools
from neutron.common import constants
def test_is_dvr_serviced_with_vm_port(self):
self._test_is_dvr_serviced('compute:', True)
+
+
+class TestIpToCidr(base.BaseTestCase):
+ def test_ip_to_cidr_ipv4_default(self):
+ self.assertEqual('15.1.2.3/32', utils.ip_to_cidr('15.1.2.3'))
+
+ def test_ip_to_cidr_ipv4_prefix(self):
+ self.assertEqual('15.1.2.3/24', utils.ip_to_cidr('15.1.2.3', 24))
+
+ def test_ip_to_cidr_ipv4_netaddr(self):
+ ip_address = netaddr.IPAddress('15.1.2.3')
+ self.assertEqual('15.1.2.3/32', utils.ip_to_cidr(ip_address))
+
+ def test_ip_to_cidr_ipv4_bad_prefix(self):
+ self.assertRaises(netaddr.core.AddrFormatError,
+ utils.ip_to_cidr, '15.1.2.3', 33)
+
+ def test_ip_to_cidr_ipv6_default(self):
+ self.assertEqual('::1/128', utils.ip_to_cidr('::1'))
+
+ def test_ip_to_cidr_ipv6_prefix(self):
+ self.assertEqual('::1/64', utils.ip_to_cidr('::1', 64))
+
+ def test_ip_to_cidr_ipv6_bad_prefix(self):
+ self.assertRaises(netaddr.core.AddrFormatError,
+ utils.ip_to_cidr, '2000::1', 129)
+
+
+class TestCidrIsHost(base.BaseTestCase):
+ def test_is_cidr_host_ipv4(self):
+ self.assertTrue(utils.is_cidr_host('15.1.2.3/32'))
+
+ def test_is_cidr_host_ipv4_not_cidr(self):
+ self.assertRaises(ValueError,
+ utils.is_cidr_host,
+ '15.1.2.3')
+
+ def test_is_cidr_host_ipv6(self):
+ self.assertTrue(utils.is_cidr_host('2000::1/128'))
+
+ def test_is_cidr_host_ipv6_netaddr(self):
+ net = netaddr.IPNetwork("2000::1")
+ self.assertTrue(utils.is_cidr_host(net))
+
+ def test_is_cidr_host_ipv6_32(self):
+ self.assertFalse(utils.is_cidr_host('2000::1/32'))
+
+ def test_is_cidr_host_ipv6_not_cidr(self):
+ self.assertRaises(ValueError,
+ utils.is_cidr_host,
+ '2000::1')
+
+ def test_is_cidr_host_ipv6_not_cidr_netaddr(self):
+ ip_address = netaddr.IPAddress("2000::3")
+ self.assertRaises(ValueError,
+ utils.is_cidr_host,
+ ip_address)
from neutron.common import config as base_config
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
+from neutron.common import utils as common_utils
from neutron.i18n import _LE
from neutron.openstack.common import log
from neutron.openstack.common import uuidutils
'port_id': _uuid()}
agent.agent_gateway_port = agent_gw_port
ri.rtr_fip_subnet = lla.LinkLocalAddressPair('169.254.30.42/31')
- ip_cidr = str(fip['floating_ip_address']) + (
- l3_agent.FLOATING_IP_CIDR_SUFFIX)
+ ip_cidr = common_utils.ip_to_cidr(fip['floating_ip_address'])
agent.floating_ip_added_dist(ri, fip, ip_cidr)
self.mock_rule.add_rule_from.assert_called_with('192.168.0.1',
16, FIP_PRI)