comment_rule = iptables_manager.comment_rule
-def port_needs_l3_security(port):
- if port['fixed_ips'] or port.get('allowed_address_pairs'):
- return True
- else:
- return False
-
-
class IptablesFirewallDriver(firewall.FirewallDriver):
"""Driver which enforces security groups through iptables rules."""
IPTABLES_DIRECTION = {firewall.INGRESS_DIRECTION: 'physdev-out',
mac_ipv6_pairs.append((mac, ip_address))
def _spoofing_rule(self, port, ipv4_rules, ipv6_rules):
- if port_needs_l3_security(port):
- # Allow dhcp client packets
- ipv4_rules += [comment_rule('-p udp -m udp --sport 68 --dport 67 '
- '-j RETURN', comment=ic.DHCP_CLIENT)]
- # Drop Router Advts from the port.
- ipv6_rules += [comment_rule('-p icmpv6 --icmpv6-type %s '
- '-j DROP' % constants.ICMPV6_TYPE_RA,
- comment=ic.IPV6_RA_DROP)]
- ipv6_rules += [comment_rule('-p icmpv6 -j RETURN',
- comment=ic.IPV6_ICMP_ALLOW)]
- ipv6_rules += [comment_rule('-p udp -m udp --sport 546 --dport '
- '547 -j RETURN',
- comment=ic.DHCP_CLIENT)]
-
+ # Allow dhcp client packets
+ ipv4_rules += [comment_rule('-p udp -m udp --sport 68 --dport 67 '
+ '-j RETURN', comment=ic.DHCP_CLIENT)]
+ # Drop Router Advts from the port.
+ ipv6_rules += [comment_rule('-p icmpv6 --icmpv6-type %s '
+ '-j DROP' % constants.ICMPV6_TYPE_RA,
+ comment=ic.IPV6_RA_DROP)]
+ ipv6_rules += [comment_rule('-p icmpv6 -j RETURN',
+ comment=ic.IPV6_ICMP_ALLOW)]
+ ipv6_rules += [comment_rule('-p udp -m udp --sport 546 --dport 547 '
+ '-j RETURN', comment=ic.DHCP_CLIENT)]
mac_ipv4_pairs = []
mac_ipv6_pairs = []
ipv6_iptables_rules)
elif direction == firewall.INGRESS_DIRECTION:
ipv6_iptables_rules += self._accept_inbound_icmpv6()
-
- if port_needs_l3_security(port):
- # include IPv4 and IPv6 iptable rules from security group
- ipv4_iptables_rules += self._convert_sgr_to_iptables_rules(
- ipv4_sg_rules)
- ipv6_iptables_rules += self._convert_sgr_to_iptables_rules(
- ipv6_sg_rules)
-
+ # include IPv4 and IPv6 iptable rules from security group
+ ipv4_iptables_rules += self._convert_sgr_to_iptables_rules(
+ ipv4_sg_rules)
+ ipv6_iptables_rules += self._convert_sgr_to_iptables_rules(
+ ipv6_sg_rules)
# finally add the rules to the port chain for a given direction
self._add_rules_to_chain_v4v6(self._port_chain_name(port, direction),
ipv4_iptables_rules,
self._spoofing_rule(port,
ipv4_iptables_rules,
ipv6_iptables_rules)
- if port_needs_l3_security(port):
- self._drop_dhcp_rule(ipv4_iptables_rules, ipv6_iptables_rules)
+ self._drop_dhcp_rule(ipv4_iptables_rules, ipv6_iptables_rules)
def _update_ipset_members(self, security_group_ids):
for ip_version, sg_ids in security_group_ids.items():
from neutron.agent.common import utils
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import ip_lib
-from neutron.agent.linux.iptables_firewall import port_needs_l3_security
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
addresses |= {p['ip_address']
for p in port_details['allowed_address_pairs']}
- if not port_needs_l3_security(port_details):
- return
-
addresses = {ip for ip in addresses
if netaddr.IPNetwork(ip).version == 4}
if any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in addresses):
'--physdev-is-bridged '
'-j $ifake_dev',
comment=ic.SG_TO_VM_SG),
+ mock.call.add_rule(
+ 'ifake_dev',
+ '-m state --state INVALID -j DROP', comment=None),
+ mock.call.add_rule(
+ 'ifake_dev',
+ '-m state --state RELATED,ESTABLISHED -j RETURN',
+ comment=None),
+ mock.call.add_rule('ifake_dev', '-j $sg-fallback',
+ comment=None),
mock.call.add_chain('ofake_dev'),
mock.call.add_rule('FORWARD',
'-m physdev --physdev-in tapfake_dev '
mock.call.add_rule(
'sfake_dev', '-j DROP',
comment=ic.PAIR_DROP),
+ mock.call.add_rule(
+ 'ofake_dev',
+ '-p udp -m udp --sport 68 --dport 67 -j RETURN',
+ comment=None),
mock.call.add_rule('ofake_dev', '-j $sfake_dev',
comment=None),
+ mock.call.add_rule(
+ 'ofake_dev',
+ '-p udp -m udp --sport 67 --dport 68 -j DROP',
+ comment=None),
+ mock.call.add_rule(
+ 'ofake_dev',
+ '-m state --state INVALID -j DROP',
+ comment=None),
+ mock.call.add_rule(
+ 'ofake_dev',
+ '-m state --state RELATED,ESTABLISHED -j RETURN',
+ comment=None),
+ mock.call.add_rule('ofake_dev', '-j $sg-fallback',
+ comment=None),
mock.call.add_rule('sg-chain', '-j ACCEPT')]
self.v4filter_inst.assert_has_calls(calls)
self.assertTrue(int_br.delete_arp_spoofing_protection.called)
self.assertFalse(int_br.install_arp_spoofing_protection.called)
- def test_arp_spoofing_basic_rule_setup_without_ip(self):
+ def test_arp_spoofing_basic_rule_setup(self):
vif = FakeVif()
fake_details = {'fixed_ips': []}
self.agent.prevent_arp_spoofing = True
self.assertEqual(
[mock.call(port=vif.ofport)],
int_br.delete_arp_spoofing_protection.mock_calls)
- self.assertFalse(int_br.install_arp_spoofing_protection.called)
-
- def test_arp_spoofing_basic_rule_setup_fixed_ip(self):
- vif = FakeVif()
- fake_details = {'fixed_ips': [{'ip_address': '192.168.44.100'}]}
- self.agent.prevent_arp_spoofing = True
- int_br = mock.create_autospec(self.agent.int_br)
- self.agent.setup_arp_spoofing_protection(int_br, vif, fake_details)
self.assertEqual(
- [mock.call(port=vif.ofport)],
- int_br.delete_arp_spoofing_protection.mock_calls)
- self.assertTrue(int_br.install_arp_spoofing_protection.called)
+ [mock.call(ip_addresses=set(), port=vif.ofport)],
+ int_br.install_arp_spoofing_protection.mock_calls)
def test_arp_spoofing_fixed_and_allowed_addresses(self):
vif = FakeVif()