class NetcatTester(object):
TESTING_STRING = 'foo'
- def __init__(self, client_namespace, server_namespace, address, port,
- root_helper='', udp=False):
+ def __init__(
+ self, client_namespace, server_namespace, server_address, port,
+ client_address=None, root_helper='', udp=False):
self.client_namespace = client_namespace
self.server_namespace = server_namespace
self._client_process = None
self._server_process = None
- self.address = address
+ # Use client_address to specify an address to connect client to that is
+ # different from the one that server side is going to listen on (useful
+ # when testing floating IPs)
+ self.client_address = client_address or server_address
+ self.server_address = server_address
self.port = str(port)
self.root_helper = root_helper
self.udp = udp
if not self._server_process:
self._spawn_server_process()
self._client_process = self._spawn_nc_in_namespace(
- self.client_namespace.namespace)
+ self.client_namespace.namespace,
+ address=self.client_address)
return self._client_process
@property
def _spawn_server_process(self):
self._server_process = self._spawn_nc_in_namespace(
- self.server_namespace.namespace, listen=True)
+ self.server_namespace.namespace,
+ address=self.server_address,
+ listen=True)
def test_connectivity(self, respawn=False):
stop_required = (respawn and self._client_process and
return message == self.TESTING_STRING
- def _spawn_nc_in_namespace(self, namespace, listen=False):
- cmd = ['nc', self.address, self.port]
+ def _spawn_nc_in_namespace(self, namespace, address, listen=False):
+ cmd = ['nc', address, self.port]
if self.udp:
cmd.append('-u')
if listen:
def _test_with_nc(self, fw_manager, direction, port, udp):
netcat = helpers.NetcatTester(self.client_ns, self.server_ns,
self.DST_ADDRESS, self.port,
- self.root_helper, udp=udp)
+ root_helper=self.root_helper,
+ udp=udp)
self.addCleanup(netcat.stop_processes)
protocol = 'tcp'
if udp:
import mock
import netaddr
from oslo_config import cfg
+import testtools
import webob
import webob.dec
import webob.exc
def test_ha_router_lifecycle(self):
self._router_lifecycle(enable_ha=True)
+ def test_conntrack_disassociate_fip(self):
+ '''Test that conntrack immediately drops stateful connection
+ that uses floating IP once it's disassociated.
+ '''
+ router_info = self.generate_router_info(enable_ha=False)
+ router = self.manage_router(self.agent, router_info)
+
+ port = helpers.get_free_namespace_port(router.ns_name)
+ client_address = '19.4.4.3'
+ server_address = '35.4.0.4'
+
+ def clean_fips(router):
+ router.router[l3_constants.FLOATINGIP_KEY] = []
+
+ clean_fips(router)
+ self._add_fip(router, client_address, fixed_address=server_address)
+ self.agent.process_router(router)
+
+ router_ns = ip_lib.IPWrapper(self.root_helper,
+ namespace=router.ns_name)
+ netcat = helpers.NetcatTester(router_ns, router_ns,
+ server_address, port,
+ client_address=client_address,
+ root_helper=self.root_helper,
+ udp=False)
+ self.addCleanup(netcat.stop_processes)
+
+ def assert_num_of_conntrack_rules(n):
+ out = router_ns.netns.execute(["conntrack", "-L",
+ "--orig-src", client_address])
+ self.assertEqual(
+ n, len([line for line in out.strip().split('\n') if line]))
+
+ with self.assert_max_execution_time(15):
+ assert_num_of_conntrack_rules(0)
+
+ self.assertTrue(netcat.test_connectivity())
+ assert_num_of_conntrack_rules(1)
+
+ clean_fips(router)
+ self.agent.process_router(router)
+ assert_num_of_conntrack_rules(0)
+
+ with testtools.ExpectedException(RuntimeError):
+ netcat.test_connectivity()
+
def test_keepalived_configuration(self):
router_info = self.generate_router_info(enable_ha=True)
router = self.manage_router(self.agent, router_info)