import copy
+import netaddr
+
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils
self.namespace = namespace
self.ipset_sets = {}
+ def _sanitize_addresses(self, addresses):
+ """This method converts any address to ipset format.
+
+ If an address has a mask of /0 we need to cover to it to a mask of
+ /1 as ipset does not support /0 length addresses. Instead we use two
+ /1's to represent the /0.
+ """
+ sanitized_addresses = []
+ for ip in addresses:
+ if (netaddr.IPNetwork(ip).prefixlen == 0):
+ if(netaddr.IPNetwork(ip).version == 4):
+ sanitized_addresses.append('0.0.0.0/1')
+ sanitized_addresses.append('128.0.0.0/1')
+ elif (netaddr.IPNetwork(ip).version == 6):
+ sanitized_addresses.append('::/1')
+ sanitized_addresses.append('8000::/1')
+ else:
+ sanitized_addresses.append(ip)
+ return sanitized_addresses
+
@staticmethod
def get_name(id, ethertype):
"""Returns the given ipset name for an id+ethertype pair.
add / remove new members, or swapped atomically if
that's faster.
"""
+ member_ips = self._sanitize_addresses(member_ips)
set_name = self.get_name(id, ethertype)
if not self.set_exists(id, ethertype):
# The initial creation is handled with create/refresh to
def expect_set(self, addresses):
temp_input = ['create %s hash:net family inet' % TEST_SET_NAME_NEW]
temp_input.extend('add %s %s' % (TEST_SET_NAME_NEW, ip)
- for ip in addresses)
+ for ip in self.ipset._sanitize_addresses(addresses))
input = '\n'.join(temp_input)
self.expected_calls.extend([
mock.call(['ipset', 'restore', '-exist'],
self.expected_calls.extend(
mock.call(['ipset', 'add', '-exist', TEST_SET_NAME, ip],
process_input=None,
- run_as_root=True) for ip in addresses)
+ run_as_root=True)
+ for ip in self.ipset._sanitize_addresses(addresses))
def expect_del(self, addresses):
+
self.expected_calls.extend(
mock.call(['ipset', 'del', TEST_SET_NAME, ip],
process_input=None,
- run_as_root=True) for ip in addresses)
+ run_as_root=True)
+ for ip in self.ipset._sanitize_addresses(addresses))
def expect_create(self):
self.expected_calls.append(
self.ipset.set_members(TEST_SET_ID, ETHERTYPE, FAKE_IPS)
self.verify_mock_calls()
+ def test_set_members_adding_all_zero_ipv4(self):
+ self.expect_set(['0.0.0.0/0'])
+ self.ipset.set_members(TEST_SET_ID, ETHERTYPE, ['0.0.0.0/0'])
+ self.verify_mock_calls()
+
+ def test_set_members_adding_all_zero_ipv6(self):
+ self.expect_set(['::/0'])
+ self.ipset.set_members(TEST_SET_ID, ETHERTYPE, ['::/0'])
+ self.verify_mock_calls()
+
def test_destroy(self):
self.add_first_ip()
self.expect_destroy()