if ((fixed_ip['ip_address'] == address_pair['ip_address'])
and (port['mac_address'] ==
address_pair['mac_address'])):
- #TODO(arosen) - need to query for address pairs
- # to check for same condition if fixed_ips change to
- # be an address pair.
raise addr_pair.AddressPairMatchesPortFixedIPAndMac()
db_pair = AllowedAddressPair(
port_id=port['id'],
return allowed_address_pairs
+ def _check_fixed_ips_and_address_pairs_no_overlap(self, context, port):
+ address_pairs = self.get_allowed_address_pairs(context, port['id'])
+ for fixed_ip in port['fixed_ips']:
+ for address_pair in address_pairs:
+ if (fixed_ip['ip_address'] == address_pair['ip_address']
+ and port['mac_address'] == address_pair['mac_address']):
+ raise addr_pair.AddressPairMatchesPortFixedIPAndMac()
+
def get_allowed_address_pairs(self, context, port_id):
pairs = (context.session.query(AllowedAddressPair).
filter_by(port_id=port_id))
need_port_update_notify = False
session = context.session
+ changed_fixed_ips = 'fixed_ips' in port['port']
with session.begin(subtransactions=True):
original_port = super(Ml2Plugin, self).get_port(context, id)
updated_port = super(Ml2Plugin, self).update_port(context, id,
context, updated_port,
port['port'][addr_pair.ADDRESS_PAIRS])
need_port_update_notify = True
+ elif changed_fixed_ips:
+ self._check_fixed_ips_and_address_pairs_no_overlap(
+ context, updated_port)
need_port_update_notify |= self.update_security_group_on_port(
context, id, port, original_port, updated_port)
network = self.get_network(context, original_port['network_id'])
return port_data
def update_port(self, context, id, port):
+ changed_fixed_ips = 'fixed_ips' in port['port']
delete_security_groups = self._check_update_deletes_security_groups(
port)
has_security_groups = self._check_update_has_security_groups(port)
self._delete_allowed_address_pairs(context, id)
self._process_create_allowed_address_pairs(
context, ret_port, ret_port[addr_pair.ADDRESS_PAIRS])
+ elif changed_fixed_ips:
+ self._check_fixed_ips_and_address_pairs_no_overlap(context,
+ ret_port)
# checks if security groups were updated adding/modifying
# security groups, port security is set and port has ip
if not (has_ip and ret_port[psec.PORTSECURITY]):
def update_port(self, context, id, port):
session = context.session
need_port_update_notify = False
+ changed_fixed_ips = 'fixed_ips' in port['port']
with session.begin(subtransactions=True):
original_port = super(OVSNeutronPluginV2, self).get_port(
context, id)
context, updated_port,
port['port'][addr_pair.ADDRESS_PAIRS])
need_port_update_notify = True
-
+ elif changed_fixed_ips:
+ self._check_fixed_ips_and_address_pairs_no_overlap(
+ context, updated_port)
need_port_update_notify |= self.update_security_group_on_port(
context, id, port, original_port, updated_port)
self._process_portbindings_create_and_update(context,
return port['port']
def update_port(self, context, id, port):
+ changed_fixed_ips = 'fixed_ips' in port['port']
delete_addr_pairs = self._check_update_deletes_allowed_address_pairs(
port)
has_addr_pairs = self._check_update_has_allowed_address_pairs(port)
self._process_create_allowed_address_pairs(
context, ret_port,
ret_port[addr_pair.ADDRESS_PAIRS])
+ elif changed_fixed_ips:
+ self._check_fixed_ips_and_address_pairs_no_overlap(context,
+ ret_port)
return ret_port
address_pairs)
self._delete('ports', port['port']['id'])
+ def test_update_fixed_ip_to_address_pair_ip_fail(self):
+ with self.network() as net:
+ with self.subnet(network=net):
+ address_pairs = [{'ip_address': '10.0.0.65'}]
+ res = self._create_port(self.fmt, net['network']['id'],
+ arg_list=(addr_pair.ADDRESS_PAIRS,),
+ allowed_address_pairs=address_pairs)
+ port = self.deserialize(self.fmt, res)['port']
+ data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.65'}]}}
+ req = self.new_update_request('ports', data, port['id'])
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int, 400)
+ self._delete('ports', port['id'])
+
+ def test_update_fixed_ip_to_address_pair_with_mac_fail(self):
+ with self.network() as net:
+ with self.subnet(network=net):
+ res = self._create_port(self.fmt, net['network']['id'])
+ port = self.deserialize(self.fmt, res)['port']
+ address_pairs = [
+ {'mac_address': port['mac_address'],
+ 'ip_address': port['fixed_ips'][0]['ip_address']}]
+ data = {'port': {addr_pair.ADDRESS_PAIRS: address_pairs}}
+ req = self.new_update_request('ports', data, port['id'])
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int, 400)
+ self._delete('ports', port['id'])
+
def test_create_address_gets_port_mac(self):
with self.network() as net:
address_pairs = [{'ip_address': '23.23.23.23'}]