self.ICMP: self._test_icmp_connectivity,
self.ARP: self._test_arp_connectivity}
self._nc_testers = dict()
+ self._pingers = dict()
self.addCleanup(self.cleanup)
def cleanup(self):
for nc in self._nc_testers.values():
nc.stop_processes()
+ for pinger in self._pingers.values():
+ pinger.stop()
@property
def vm_namespace(self):
def vm_mac_address(self, mac_address):
self._vm.mac_address = mac_address
+ @property
+ def peer_mac_address(self):
+ return self._peer.port.link.address
+
+ @peer_mac_address.setter
+ def peer_mac_address(self, mac_address):
+ self._peer.mac_address = mac_address
+
@property
def peer_namespace(self):
return self._peer.namespace
self._nc_testers[nc_key] = nc_tester
return nc_tester
+ def _get_pinger(self, direction):
+ try:
+ pinger = self._pingers[direction]
+ except KeyError:
+ src_namespace, dst_address = self._get_namespace_and_address(
+ direction)
+ pinger = net_helpers.Pinger(src_namespace, dst_address)
+ self._pingers[direction] = pinger
+ return pinger
+
+ def start_sending_icmp(self, direction):
+ pinger = self._get_pinger(direction)
+ pinger.start()
+
+ def stop_sending_icmp(self, direction):
+ pinger = self._get_pinger(direction)
+ pinger.stop()
+
+ def get_sent_icmp_packets(self, direction):
+ pinger = self._get_pinger(direction)
+ return pinger.sent
+
+ def get_received_icmp_packets(self, direction):
+ pinger = self._get_pinger(direction)
+ return pinger.received
+
class LinuxBridgeConnectionTester(ConnectionTester):
"""Tester with linux bridge in the middle
def vm_port_id(self):
return net_helpers.VethFixture.get_peer_name(self._vm.port.name)
+ @property
+ def peer_port_id(self):
+ return net_helpers.VethFixture.get_peer_name(self._peer.port.name)
+
def flush_arp_tables(self):
self._bridge.neigh.flush(4, 'all')
super(LinuxBridgeConnectionTester, self).flush_arp_tables()
self.child_pid = utils.get_root_helper_child_pid(
self.pid, run_as_root=True)
+ @property
+ def is_running(self):
+ return self.poll() is None
+
+
+class Pinger(object):
+ """Class for sending ICMP packets asynchronously
+
+ The aim is to keep sending ICMP packets on background while executing other
+ code. After background 'ping' command is stopped, statistics are available.
+
+ Difference to assert_(no_)ping() functions located in this module is that
+ these methods send given count of ICMP packets while they wait for the
+ exit code of 'ping' command.
+
+ >>> pinger = Pinger('pinger_test', '192.168.0.2')
+
+ >>> pinger.start(); time.sleep(5); pinger.stop()
+
+ >>> pinger.sent, pinger.received
+ 7 7
+
+ """
+
+ stats_pattern = re.compile(
+ r'^(?P<trans>\d+) packets transmitted,.*(?P<recv>\d+) received.*$')
+ TIMEOUT = 15
+
+ def __init__(self, namespace, address, count=None, timeout=1):
+ self.proc = None
+ self.namespace = namespace
+ self.address = address
+ self.count = count
+ self.timeout = timeout
+ self.sent = 0
+ self.received = 0
+
+ def _wait_for_death(self):
+ is_dead = lambda: self.proc.poll() is not None
+ utils.wait_until_true(
+ is_dead, timeout=self.TIMEOUT, exception=RuntimeError(
+ "Ping command hasn't ended after %d seconds." % self.TIMEOUT))
+
+ def _parse_stats(self):
+ for line in self.proc.stdout:
+ result = self.stats_pattern.match(line)
+ if result:
+ self.sent = int(result.group('trans'))
+ self.received = int(result.group('recv'))
+ break
+ else:
+ raise RuntimeError("Didn't find ping statistics.")
+
+ def start(self):
+ if self.proc and self.proc.is_running:
+ raise RuntimeError("This pinger has already a running process")
+ ip_version = ip_lib.get_ip_version(self.address)
+ ping_exec = 'ping' if ip_version == 4 else 'ping6'
+ cmd = [ping_exec, self.address, '-W', str(self.timeout)]
+ if self.count:
+ cmd.extend(['-c', str(self.count)])
+ self.proc = RootHelperProcess(cmd, namespace=self.namespace)
+
+ def stop(self):
+ if self.proc and self.proc.is_running:
+ self.proc.kill(signal.SIGINT)
+ self._wait_for_death()
+ self._parse_stats()
+
class NetcatTester(object):
TCP = n_const.PROTO_NAME_TCP
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, list())
self.tester.assert_established_connection(**connection)
+
+ def test_preventing_firewall_blink(self):
+ direction = self.tester.INGRESS
+ sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
+ 'protocol': 'tcp'}]
+ self.tester.start_sending_icmp(direction)
+ self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+ self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, {})
+ self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
+ self.tester.stop_sending_icmp(direction)
+ packets_sent = self.tester.get_sent_icmp_packets(direction)
+ packets_received = self.tester.get_received_icmp_packets(direction)
+ self.assertGreater(packets_sent, 0)
+ self.assertEqual(0, packets_received)
+
+ def test_remote_security_groups(self):
+ remote_sg_id = 'remote_sg_id'
+ peer_port_desc = self._create_port_description(
+ self.tester.peer_port_id,
+ [self.tester.peer_ip_address],
+ self.tester.peer_mac_address,
+ [remote_sg_id])
+ self.firewall.prepare_port_filter(peer_port_desc)
+
+ peer_sg_rules = [{'ethertype': 'IPv4', 'direction': 'egress',
+ 'protocol': 'icmp'}]
+ self._apply_security_group_rules(remote_sg_id, peer_sg_rules)
+
+ vm_sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
+ 'protocol': 'icmp', 'remote_group_id': remote_sg_id}]
+ self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID,
+ vm_sg_rules)
+
+ vm_sg_members = {'IPv4': [self.tester.peer_ip_address]}
+ with self.firewall.defer_apply():
+ self.firewall.update_security_group_members(
+ remote_sg_id, vm_sg_members)
+
+ self.tester.assert_connection(protocol=self.tester.ICMP,
+ direction=self.tester.INGRESS)
+ self.tester.assert_no_connection(protocol=self.tester.TCP,
+ direction=self.tester.INGRESS)
+ self.tester.assert_no_connection(protocol=self.tester.ICMP,
+ direction=self.tester.EGRESS)