Current code does not allow assigning a fixed ip to a port when that ip
overlaps with one of the addresses in the allowed-addresspairs list.
This is an unnecessary check as the overlap does not have any negative
effect. Further, such a check actually makes it hard to use this
API. For example, if a fixed IP 10.10.1.1 exists on a port and we
want to allow addresses in 10.10.1.0/24 cidr on that port, then one
has to configure a list of 8 cidrs ([10.10.1.0/32, 10.10.1.2/31,
10.10.1.4/30, ..., 10.10.1.128/25]) on the allowed-addresspairs.
In addition to the above reasons, the current code also does not
check for the overlaps in all cases.
This patch summarily removes this overlap check.
Closes-Bug: #
1321864
Change-Id: I5498c4a72b31267644da10a54a9860c1fc3bb250
return allowed_address_pairs
- def _check_fixed_ips_and_address_pairs_no_overlap(self, context, port):
- address_pairs = self.get_allowed_address_pairs(context, port['id'])
- for fixed_ip in port['fixed_ips']:
- for address_pair in address_pairs:
- if (fixed_ip['ip_address'] == address_pair['ip_address']
- and port['mac_address'] == address_pair['mac_address']):
- raise addr_pair.AddressPairMatchesPortFixedIPAndMac()
-
def get_allowed_address_pairs(self, context, port_id):
pairs = (context.session.query(AllowedAddressPair).
filter_by(port_id=port_id))
ctrl_update_required |= (
self.update_address_pairs_on_port(context, port_id, port,
orig_port, new_port))
- if 'fixed_ips' in port['port']:
- self._check_fixed_ips_and_address_pairs_no_overlap(
- context, new_port)
self._update_extra_dhcp_opts_on_port(context, port_id, port,
new_port)
old_host_id = porttracker_db.get_port_hostid(context,
need_port_update_notify = False
session = context.session
- changed_fixed_ips = 'fixed_ips' in port['port']
with session.begin(subtransactions=True):
try:
port_db = (session.query(models_v2.Port).
self.update_address_pairs_on_port(context, id, port,
original_port,
updated_port))
- elif changed_fixed_ips:
- self._check_fixed_ips_and_address_pairs_no_overlap(
- context, updated_port)
need_port_update_notify |= self.update_security_group_on_port(
context, id, port, original_port, updated_port)
network = self.get_network(context, original_port['network_id'])
"id=%(id)s port=%(port)s ."),
{'id': id, 'port': port})
need_port_update_notify = False
- changed_fixed_ips = 'fixed_ips' in port['port']
with context.session.begin(subtransactions=True):
old_port = super(NECPluginV2, self).get_port(context, id)
new_port = super(NECPluginV2, self).update_port(context, id, port)
self.update_address_pairs_on_port(context, id, port,
old_port,
new_port))
- elif changed_fixed_ips:
- self._check_fixed_ips_and_address_pairs_no_overlap(
- context, new_port)
need_port_update_notify |= self.update_security_group_on_port(
context, id, port, old_port, new_port)
def update_port(self, context, id, port):
session = context.session
need_port_update_notify = False
- changed_fixed_ips = 'fixed_ips' in port['port']
with session.begin(subtransactions=True):
original_port = super(OVSNeutronPluginV2, self).get_port(
context, id)
self.update_address_pairs_on_port(context, id, port,
original_port,
updated_port))
- elif changed_fixed_ips:
- self._check_fixed_ips_and_address_pairs_no_overlap(
- context, updated_port)
need_port_update_notify |= self.update_security_group_on_port(
context, id, port, original_port, updated_port)
self._process_portbindings_create_and_update(context,
return port_data
def update_port(self, context, id, port):
- changed_fixed_ips = 'fixed_ips' in port['port']
delete_security_groups = self._check_update_deletes_security_groups(
port)
has_security_groups = self._check_update_has_security_groups(port)
self._delete_allowed_address_pairs(context, id)
self._process_create_allowed_address_pairs(
context, ret_port, ret_port[addr_pair.ADDRESS_PAIRS])
- elif changed_fixed_ips:
- self._check_fixed_ips_and_address_pairs_no_overlap(context,
- ret_port)
# checks if security groups were updated adding/modifying
# security groups, port security is set and port has ip
if not (has_ip and ret_port[psec.PORTSECURITY]):
return port['port']
def update_port(self, context, id, port):
- changed_fixed_ips = 'fixed_ips' in port['port']
delete_addr_pairs = self._check_update_deletes_allowed_address_pairs(
port)
has_addr_pairs = self._check_update_has_allowed_address_pairs(port)
self._process_create_allowed_address_pairs(
context, ret_port,
ret_port[addr_pair.ADDRESS_PAIRS])
- elif changed_fixed_ips:
- self._check_fixed_ips_and_address_pairs_no_overlap(context,
- ret_port)
return ret_port
address_pairs)
self._delete('ports', port['port']['id'])
- def test_update_fixed_ip_to_address_pair_ip_fail(self):
- with self.network() as net:
- with self.subnet(network=net):
- address_pairs = [{'ip_address': '10.0.0.65'}]
- res = self._create_port(self.fmt, net['network']['id'],
- arg_list=(addr_pair.ADDRESS_PAIRS,),
- allowed_address_pairs=address_pairs)
- port = self.deserialize(self.fmt, res)['port']
- data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.65'}]}}
- req = self.new_update_request('ports', data, port['id'])
- res = req.get_response(self.api)
- self.assertEqual(res.status_int, 400)
- self._delete('ports', port['id'])
-
- def test_update_fixed_ip_to_address_pair_with_mac_fail(self):
- with self.network() as net:
- with self.subnet(network=net):
- res = self._create_port(self.fmt, net['network']['id'])
- port = self.deserialize(self.fmt, res)['port']
- address_pairs = [
- {'mac_address': port['mac_address'],
- 'ip_address': port['fixed_ips'][0]['ip_address']}]
- data = {'port': {addr_pair.ADDRESS_PAIRS: address_pairs}}
- req = self.new_update_request('ports', data, port['id'])
- res = req.get_response(self.api)
- self.assertEqual(res.status_int, 400)
- self._delete('ports', port['id'])
-
def test_create_address_gets_port_mac(self):
with self.network() as net:
address_pairs = [{'ip_address': '23.23.23.23'}]
port['mac_address'])
self._delete('ports', port['id'])
- def test_update_address_pair_to_match_fixed_ip_and_mac(self):
- with self.network() as net:
- with self.subnet(network=net):
- res = self._create_port(self.fmt, net['network']['id'])
- port = self.deserialize(self.fmt, res)['port']
- address_pairs = [{'mac_address': port['mac_address'],
- 'ip_address':
- port['fixed_ips'][0]['ip_address']}]
-
- update_port = {'port': {addr_pair.ADDRESS_PAIRS:
- address_pairs}}
- req = self.new_update_request('ports', update_port,
- port['id'])
- res = req.get_response(self.api)
- self.assertEqual(res.status_int, 400)
- self._delete('ports', port['id'])
-
def test_update_port_security_off_address_pairs(self):
if self._skip_port_security:
self.skipTest("Plugin does not implement port-security extension")