from neutron.agent.linux import iptables_manager
from neutron.agent.linux import utils
from neutron.common import constants
+from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.extensions import portsecurity as psec
from neutron.i18n import _LI
try:
return self._device_zone_map[short_port_id]
except KeyError:
- self._free_zones_from_removed_ports()
return self._generate_device_zone(short_port_id)
def _free_zones_from_removed_ports(self):
def _generate_device_zone(self, short_port_id):
"""Generates a unique conntrack zone for the passed in ID."""
- zone = self._find_open_zone()
+ try:
+ zone = self._find_open_zone()
+ except n_exc.CTZoneExhaustedError:
+ # Free some zones and try again, repeat failure will not be caught
+ self._free_zones_from_removed_ports()
+ zone = self._find_open_zone()
+
self._device_zone_map[short_port_id] = zone
LOG.debug("Assigned CT zone %(z)s to port %(dev)s.",
{'z': zone, 'dev': short_port_id})
# gap found, let's use it!
return index + 1
# conntrack zones exhausted :( :(
- raise RuntimeError("iptables conntrack zones exhausted. "
- "iptables rules cannot be applied.")
+ raise n_exc.CTZoneExhaustedError()
class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
from neutron.agent.linux import iptables_firewall
from neutron.agent import securitygroups_rpc as sg_cfg
from neutron.common import constants
+from neutron.common import exceptions as n_exc
from neutron.tests import base
from neutron.tests.unit.api.v2 import test_base
cmd.extend(['-d', 'fe80::1'])
else:
cmd.extend(['-s', 'fe80::1'])
- cmd.extend(['-w', 1])
+ # initial data has 1, 2, and 9 in use, CT zone will start at 10.
+ cmd.extend(['-w', 10])
calls = [
mock.call(cmd, run_as_root=True, check_exit_code=True,
extra_ok_codes=[1])]
self.firewall.filtered_ports[port['device']] = new_port
self.firewall.filter_defer_apply_off()
calls = [
+ # initial data has 1, 2, and 9 in use, CT zone will start at 10.
mock.call(['conntrack', '-D', '-f', 'ipv4', '-d', '10.0.0.1',
- '-w', 1],
+ '-w', 10],
run_as_root=True, check_exit_code=True,
extra_ok_codes=[1]),
mock.call(['conntrack', '-D', '-f', 'ipv6', '-d', 'fe80::1',
- '-w', 1],
+ '-w', 10],
run_as_root=True, check_exit_code=True,
extra_ok_codes=[1])]
self.utils_exec.assert_has_calls(calls)
def setUp(self):
super(OVSHybridIptablesFirewallTestCase, self).setUp()
self.firewall = iptables_firewall.OVSHybridIptablesFirewallDriver()
+ # inital data has 1, 2, and 9 in use, see RAW_TABLE_OUTPUT above.
+ self._dev_zone_map = {'61634509-31': 2, '8f46cf18-12': 9,
+ '95c24827-02': 2, 'e804433b-61': 1}
def test__populate_initial_zone_map(self):
- expected = {'61634509-31': 2, '8f46cf18-12': 9,
- '95c24827-02': 2, 'e804433b-61': 1}
- self.assertEqual(expected, self.firewall._device_zone_map)
+ self.assertEqual(self._dev_zone_map, self.firewall._device_zone_map)
def test__generate_device_zone(self):
# inital data has 1, 2, and 9 in use.
# fill it up and then make sure an extra throws an error
for i in range(1, 65536):
self.firewall._device_zone_map['dev-%s' % i] = i
- with testtools.ExpectedException(RuntimeError):
+ with testtools.ExpectedException(n_exc.CTZoneExhaustedError):
self.firewall._find_open_zone()
+ # with it full, try again, this should trigger a cleanup and return 1
+ self.assertEqual(1, self.firewall._generate_device_zone('p12'))
+ self.assertEqual({'p12': 1}, self.firewall._device_zone_map)
+
def test_get_device_zone(self):
- # calling get_device_zone should clear out all of the other entries
- # since they aren't in the filtered ports list
- self.assertEqual(1, self.firewall.get_device_zone('12345678901234567'))
+ # initial data has 1, 2, and 9 in use.
+ self.assertEqual(10,
+ self.firewall.get_device_zone('12345678901234567'))
# should have been truncated to 11 chars
- self.assertEqual({'12345678901': 1}, self.firewall._device_zone_map)
+ self._dev_zone_map.update({'12345678901': 10})
+ self.assertEqual(self._dev_zone_map, self.firewall._device_zone_map)