LOG = logging.getLogger(__name__)
-IP_MASK = {q_const.IPv4: 32,
- q_const.IPv6: 128}
-
-
DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
'egress': 'dest_ip_prefix'}
+DHCP_RULE_PORT = {4: (67, 68, q_const.IPv4), 6: (547, 546, q_const.IPv6)}
+
class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
"""Mixin class to add agent-based security group implementation."""
ips[network_id] = []
for port, ip in query:
- ips[port['network_id']].append(ip)
+ if (netaddr.IPAddress(ip).version == 6
+ and not netaddr.IPAddress(ip).is_link_local()):
+ mac_address = port['mac_address']
+ ip = str(ipv6.get_ipv6_addr_by_EUI64(q_const.IPV6_LLA_PREFIX,
+ mac_address))
+ if ip not in ips[port['network_id']]:
+ ips[port['network_id']].append(ip)
+
return ips
def _select_ra_ips_for_network_ids(self, context, network_ids):
def _add_ingress_dhcp_rule(self, port, ips):
dhcp_ips = ips.get(port['network_id'])
for dhcp_ip in dhcp_ips:
- if not netaddr.IPAddress(dhcp_ip).version == 4:
- return
-
+ source_port, dest_port, ethertype = DHCP_RULE_PORT[
+ netaddr.IPAddress(dhcp_ip).version]
dhcp_rule = {'direction': 'ingress',
- 'ethertype': q_const.IPv4,
+ 'ethertype': ethertype,
'protocol': 'udp',
- 'port_range_min': 68,
- 'port_range_max': 68,
- 'source_port_range_min': 67,
- 'source_port_range_max': 67}
- dhcp_rule['source_ip_prefix'] = "%s/%s" % (dhcp_ip,
- IP_MASK[q_const.IPv4])
+ 'port_range_min': dest_port,
+ 'port_range_max': dest_port,
+ 'source_port_range_min': source_port,
+ 'source_port_range_max': source_port,
+ 'source_ip_prefix': dhcp_ip}
port['security_group_rules'].append(dhcp_rule)
def _add_ingress_ra_rule(self, port, ips):
# The mock interferes with HTTP(S) connection caching
cfg.CONF.set_override('cache_connections', False, 'RESTPROXY')
cfg.CONF.set_override('service_plugins', ['bigswitch_l3'])
+ cfg.CONF.set_override('add_meta_server_route', False, 'RESTPROXY')
def setup_patches(self):
self.plugin_notifier_p = mock.patch(NOTIFIER)
FAKE_IP = {const.IPv4: '10.0.0.1',
const.IPv6: 'fe80::1',
'IPv6_GLOBAL': '2001:0db8::1',
- 'IPv6_LLA': 'fe80::123'}
+ 'IPv6_LLA': 'fe80::123',
+ 'IPv6_DHCP': '2001:db8::3'}
TEST_PLUGIN_CLASS = ('neutron.tests.unit.test_security_groups_rpc.'
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ dhcp_port = self._create_port(
+ self.fmt, n['network']['id'],
+ fixed_ips=[{'subnet_id': subnet_v6['subnet']['id'],
+ 'ip_address': FAKE_IP['IPv6_DHCP']}],
+ device_owner=const.DEVICE_OWNER_DHCP,
+ security_groups=[sg1_id])
+ dhcp_rest = self.deserialize(self.fmt, dhcp_port)
+ dhcp_mac = dhcp_rest['port']['mac_address']
+ dhcp_lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
+ const.IPV6_LLA_PREFIX,
+ dhcp_mac))
+
res1 = self._create_port(
self.fmt, n['network']['id'],
fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
port_rpc = ports_rpc[port_id1]
+ source_port, dest_port, ethertype = sg_db_rpc.DHCP_RULE_PORT[6]
expected = [{'direction': 'egress', 'ethertype': const.IPv4,
'security_group_id': sg1_id},
{'direction': 'egress', 'ethertype': const.IPv6,
'ethertype': const.IPv6,
'source_ip_prefix': fake_gateway,
'source_port_range_min': const.ICMPV6_TYPE_RA},
+ {'direction': 'ingress',
+ 'ethertype': ethertype,
+ 'port_range_max': dest_port,
+ 'port_range_min': dest_port,
+ 'protocol': const.PROTO_NAME_UDP,
+ 'source_ip_prefix': dhcp_lla_ip,
+ 'source_port_range_max': source_port,
+ 'source_port_range_min': source_port}
]
self.assertEqual(port_rpc['security_group_rules'],
expected)