exceptions.InUse: webob.exc.HTTPConflict,
exceptions.MacAddressGenerationFailure:
webob.exc.HTTPServiceUnavailable,
- exceptions.StateInvalid: webob.exc.HTTPBadRequest}
+ exceptions.StateInvalid: webob.exc.HTTPBadRequest,
+ exceptions.InvalidInput: webob.exc.HTTPBadRequest}
def fields(request):
'default': True},
'mac_address': {'allow_post': True, 'allow_put': False,
'default': ATTR_NOT_SPECIFIED},
- 'fixed_ips_v4': {'allow_post': True, 'allow_put': True,
- 'default': ATTR_NOT_SPECIFIED},
- 'fixed_ips_v6': {'allow_post': True, 'allow_put': True,
- 'default': ATTR_NOT_SPECIFIED},
+ 'fixed_ips': {'allow_post': True, 'allow_put': True,
+ 'default': ATTR_NOT_SPECIFIED},
'host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
'device_id': {'allow_post': True, 'allow_put': True, 'default': ''},
"There is one or more attachments plugged into its ports.")
+class SubnetInUse(InUse):
+ message = _("Unable to complete operation on subnet %(subnet_id)s. "
+ "There is used by one or more ports.")
+
+
class PortInUse(InUse):
message = _("Unable to complete operation on port %(port_id)s "
"for network %(net_id)s. The attachment '%(att_id)s"
"The mac address %(mac)s is in use.")
+class IpAddressInUse(InUse):
+ message = _("Unable to complete operation for network %(net_id)s. "
+ "The IP address %(ip_address)s is in use.")
+
+
class AlreadyAttached(QuantumException):
message = _("Unable to plug the attachment %(att_id)s into port "
"%(port_id)s for network %(net_id)s. The attachment is "
pass
+class InvalidInput(QuantumException):
+ message = _("Invalid input for operation: %(error_message)s.")
+
+
class InvalidContentType(Invalid):
message = _("Invalid content type %(content_type)s.")
class MacAddressGenerationFailure(QuantumException):
message = _("Unable to generate unique mac on network %(net_id)s.")
+
+
+class IpAddressGenerationFailure(QuantumException):
+ message = _("No more IP addresses available on network %(net_id)s.")
return True
return False
+ @staticmethod
+ def _recycle_ip(context, network_id, subnet_id, port_id, ip_address):
+ """Return an IP address to the pool of free IP's on the network
+ subnet.
+ """
+ range_qry = context.session.query(models_v2.IPAllocationRange)
+ # Two requests will be done on the database. The first will be to
+ # search if an entry starts with ip_address + 1 (r1). The second
+ # will be to see if an entry ends with ip_address -1 (r2).
+ # If 1 of the above holds true then the specific entry will be
+ # modified. If both hold true then the two ranges will be merged.
+ # If there are no entries then a single entry will be added.
+ ip_first = str(netaddr.IPAddress(ip_address) + 1)
+ ip_last = str(netaddr.IPAddress(ip_address) - 1)
+ LOG.debug("Recycle %s", ip_address)
+
+ try:
+ r1 = range_qry.filter_by(subnet_id=subnet_id,
+ first_ip=ip_first).one()
+ LOG.debug("Recycle: first match for %s-%s", r1['first_ip'],
+ r1['last_ip'])
+ except exc.NoResultFound:
+ r1 = []
+ try:
+ r2 = range_qry.filter_by(subnet_id=subnet_id,
+ last_ip=ip_last).one()
+ LOG.debug("Recycle: last match for %s-%s", r2['first_ip'],
+ r2['last_ip'])
+ except exc.NoResultFound:
+ r2 = []
+
+ if r1 and r2:
+ # Merge the two ranges
+ ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
+ first_ip=r2['first_ip'],
+ last_ip=r1['last_ip'])
+ context.session.add(ip_range)
+ LOG.debug("Recycle: merged %s-%s and %s-%s", r2['first_ip'],
+ r2['last_ip'], r1['first_ip'], r1['last_ip'])
+ context.session.delete(r1)
+ context.session.delete(r2)
+ elif r1:
+ # Update the range with matched first IP
+ r1['first_ip'] = ip_address
+ LOG.debug("Recycle: updated first %s-%s", r1['first_ip'],
+ r1['last_ip'])
+ elif r2:
+ # Update the range with matched last IP
+ r2['last_ip'] = ip_address
+ LOG.debug("Recycle: updated last %s-%s", r2['first_ip'],
+ r2['last_ip'])
+ else:
+ # Create a new range
+ ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
+ first_ip=ip_address,
+ last_ip=ip_address)
+ context.session.add(ip_range)
+ LOG.debug("Recycle: created new %s-%s", ip_address, ip_address)
+
+ # Delete the IP address from the IPAllocate table
+ LOG.debug("Delete allocated IP %s (%s/%s/%s)", ip_address,
+ network_id, subnet_id, port_id)
+ alloc_qry = context.session.query(models_v2.IPAllocation)
+ allocated = alloc_qry.filter_by(network_id=network_id,
+ port_id=port_id,
+ ip_address=ip_address,
+ subnet_id=subnet_id).delete()
+
+ @staticmethod
+ def _generate_ip(context, network_id, subnets):
+ """Generate an IP address.
+
+ The IP address will be generated from one of the subnets defined on
+ the network.
+ """
+ range_qry = context.session.query(models_v2.IPAllocationRange)
+ for subnet in subnets:
+ range = range_qry.filter_by(subnet_id=subnet['id']).first()
+ if not range:
+ LOG.debug("All IP's from subnet %s (%s) allocated",
+ subnet['id'], subnet['cidr'])
+ continue
+ ip_address = range['first_ip']
+ LOG.debug("Allocated IP - %s from %s to %s", ip_address,
+ range['first_ip'], range['last_ip'])
+ if range['first_ip'] == range['last_ip']:
+ # No more free indices on subnet => delete
+ LOG.debug("No more free IP's in slice. Deleting allocation "
+ "pool.")
+ context.session.delete(range)
+ else:
+ # increment the first free
+ range['first_ip'] = str(netaddr.IPAddress(ip_address) + 1)
+ return {'ip_address': ip_address, 'subnet_id': subnet['id']}
+ raise q_exc.IpAddressGenerationFailure(net_id=network_id)
+
+ @staticmethod
+ def _allocate_specific_ip(context, subnet_id, ip_address):
+ """Allocate a specific IP address on the subnet."""
+ ip = int(netaddr.IPAddress(ip_address))
+ range_qry = context.session.query(models_v2.IPAllocationRange)
+ ranges = range_qry.filter_by(subnet_id=subnet_id).all()
+ for range in ranges:
+ first = int(netaddr.IPAddress(range['first_ip']))
+ last = int(netaddr.IPAddress(range['last_ip']))
+ if first <= ip <= last:
+ if first == last:
+ context.session.delete(range)
+ return
+ elif first == ip:
+ range['first_ip'] = str(netaddr.IPAddress(ip_address) + 1)
+ return
+ elif last == ip:
+ range['last_ip'] = str(netaddr.IPAddress(ip_address) - 1)
+ return
+ else:
+ # Split into two ranges
+ new_first = str(netaddr.IPAddress(ip_address) + 1)
+ new_last = range['last_ip']
+ range['last_ip'] = str(netaddr.IPAddress(ip_address) - 1)
+ ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
+ first_ip=new_first,
+ last_ip=new_last)
+ context.session.add(ip_range)
+ return
+
+ @staticmethod
+ def _check_unique_ip(context, network_id, subnet_id, ip_address):
+ """Validate that the IP address on the subnet is not in use."""
+ ip_qry = context.session.query(models_v2.IPAllocation)
+ try:
+ ip_qry.filter_by(network_id=network_id,
+ subnet_id=subnet_id,
+ ip_address=ip_address).one()
+ except exc.NoResultFound:
+ return True
+ return False
+
+ @staticmethod
+ def _check_subnet_ip(cidr, ip_address):
+ """Validate that the IP address is on the subnet."""
+ ip = netaddr.IPAddress(ip_address)
+ net = netaddr.IPNetwork(cidr)
+ # Check that the IP is valid on subnet. This cannot be the
+ # network or the broadcast address
+ if (ip != net.network and
+ ip != net.broadcast and
+ net.netmask & ip == net.ip):
+ return True
+ return False
+
+ def _test_fixed_ips_for_port(self, context, network_id, fixed_ips):
+ """Test fixed IPs for port.
+
+ Check that configured subnets are valid prior to allocating any
+ IPs. Include the subnet_id in the result if only an IP address is
+ configured.
+
+ :raises: InvalidInput, IpAddressInUse
+ """
+ fixed_ip_set = []
+ for fixed in fixed_ips:
+ found = False
+ if 'subnet_id' not in fixed:
+ if 'ip_address' not in fixed:
+ msg = _('IP allocation requires subnet_id or ip_address')
+ raise q_exc.InvalidInput(error_message=msg)
+
+ filter = {'network_id': [network_id]}
+ subnets = self.get_subnets(context, filters=filter)
+ for subnet in subnets:
+ if QuantumDbPluginV2._check_subnet_ip(subnet['cidr'],
+ fixed['ip_address']):
+ found = True
+ subnet_id = subnet['id']
+ break
+ if not found:
+ msg = _('IP address %s is not a valid IP for the defined '
+ 'networks subnets') % fixed['ip_address']
+ raise q_exc.InvalidInput(error_message=msg)
+ else:
+ subnet = self._get_subnet(context, fixed['subnet_id'])
+ subnet_id = subnet['id']
+
+ if 'ip_address' in fixed:
+ # Ensure that the IP's are unique
+ if not QuantumDbPluginV2._check_unique_ip(context, network_id,
+ subnet_id,
+ fixed['ip_address']):
+ raise q_exc.IpAddressInUse(net_id=network_id,
+ ip_address=fixed['ip_address'])
+
+ # Ensure that the IP is valid on the subnet
+ if (not found and
+ not QuantumDbPluginV2._check_subnet_ip(
+ subnet['cidr'], fixed['ip_address'])):
+ msg = _('IP address %s is not a valid IP for the defined '
+ 'subnet') % fixed['ip_address']
+ raise q_exc.InvalidInput(error_message=msg)
+
+ fixed_ip_set.append({'subnet_id': subnet_id,
+ 'ip_address': fixed['ip_address']})
+ else:
+ fixed_ip_set.append({'subnet_id': subnet_id})
+ return fixed_ip_set
+
+ def _allocate_fixed_ips(self, context, network, fixed_ips):
+ """Allocate IP addresses according to the configured fixed_ips."""
+ ips = []
+ for fixed in fixed_ips:
+ if 'ip_address' in fixed:
+ # Remove the IP address from the allocation pool
+ QuantumDbPluginV2._allocate_specific_ip(
+ context, fixed['subnet_id'], fixed['ip_address'])
+ ips.append({'ip_address': fixed['ip_address'],
+ 'subnet_id': fixed['subnet_id']})
+ # Only subnet ID is specified => need to generate IP
+ # from subnet
+ else:
+ subnets = [self._get_subnet(context, fixed['subnet_id'])]
+ # IP address allocation
+ result = self._generate_ip(context, network, subnets)
+ ips.append({'ip_address': result['ip_address'],
+ 'subnet_id': result['subnet_id']})
+ return ips
+
+ def _update_ips_for_port(self, context, network_id, port_id, original_ips,
+ new_ips):
+ """Add or remove IPs from the port."""
+ ips = []
+ # Remove all of the intersecting elements
+ for original_ip in original_ips[:]:
+ for new_ip in new_ips[:]:
+ if 'ip_address' in new_ip:
+ if (original_ip['ip_address'] == new_ip['ip_address']
+ and
+ original_ip['subnet_id'] == new_ip['subnet_id']):
+ original_ips.remove(original_ip)
+ new_ips.remove(new_ip)
+
+ # Check if the IP's to add are OK
+ to_add = self._test_fixed_ips_for_port(context, network_id, new_ips)
+ for ip in original_ips:
+ LOG.debug("Port update. Deleting %s", ip)
+ QuantumDbPluginV2._recycle_ip(context,
+ network_id=network_id,
+ subnet_id=ip['subnet_id'],
+ ip_address=ip['ip_address'],
+ port_id=port_id)
+
+ if to_add:
+ LOG.debug("Port update. Adding %s", to_add)
+ network = self._get_network(context, network_id)
+ ips = self._allocate_fixed_ips(context, network, to_add)
+ return ips
+
+ def _allocate_ips_for_port(self, context, network, port):
+ """Allocate IP addresses for the port.
+
+ If port['fixed_ips'] is set to 'ATTR_NOT_SPECIFIED', allocate IP
+ addresses for the port. If port['fixed_ips'] contains an IP address or
+ a subnet_id then allocate an IP address accordingly.
+ """
+ p = port['port']
+ ips = []
+
+ fixed_configured = (p['fixed_ips'] != api_router.ATTR_NOT_SPECIFIED)
+ if fixed_configured:
+ configured_ips = self._test_fixed_ips_for_port(context,
+ p["network_id"],
+ p['fixed_ips'])
+ ips = self._allocate_fixed_ips(context, network, configured_ips)
+ else:
+ filter = {'network_id': [p['network_id']]}
+ subnets = self.get_subnets(context, filters=filter)
+ # Split into v4 and v6 subnets
+ v4 = []
+ v6 = []
+ for subnet in subnets:
+ if subnet['ip_version'] == 4:
+ v4.append(subnet)
+ else:
+ v6.append(subnet)
+ version_subnets = [v4, v6]
+ for subnets in version_subnets:
+ if subnets:
+ result = QuantumDbPluginV2._generate_ip(context, network,
+ subnets)
+ ips.append({'ip_address': result['ip_address'],
+ 'subnet_id': result['subnet_id']})
+ return ips
+
def _make_network_dict(self, network, fields=None):
res = {'id': network['id'],
'name': network['name'],
"mac_address": port["mac_address"],
"admin_state_up": port["admin_state_up"],
"status": port["status"],
- "fixed_ips": [ip["address"] for ip in port["fixed_ips"]],
+ "fixed_ips": [{'subnet_id': ip["subnet_id"],
+ 'ip_address': ip["ip_address"]}
+ for ip in port["fixed_ips"]],
"device_id": port["device_id"]}
return self._fields(res, fields)
def create_subnet(self, context, subnet):
s = subnet['subnet']
+ net = netaddr.IPNetwork(s['cidr'])
if s['gateway_ip'] == api_router.ATTR_NOT_SPECIFIED:
- net = netaddr.IPNetwork(s['cidr'])
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
+ ip = netaddr.IPAddress(s['gateway_ip'])
+ # Get the first and last indices for the subnet
+ ranges = []
+ # Gateway is the first address in the range
+ if ip == net.network + 1:
+ range = {'first': str(ip + 1),
+ 'last': str(net.broadcast - 1)}
+ ranges.append(range)
+ # Gateway is the last address in the range
+ elif ip == net.broadcast - 1:
+ range = {'first': str(net.network + 1),
+ 'last': str(ip - 1)}
+ ranges.append(range)
+ # Gateway is on IP in the subnet
+ else:
+ range = {'first': str(net.network + 1),
+ 'last': str(ip - 1)}
+ ranges.append(range)
+ range = {'first': str(ip + 1),
+ 'last': str(net.broadcast - 1)}
+ ranges.append(range)
with context.session.begin():
network = self._get_network(context, s["network_id"])
subnet = models_v2.Subnet(network_id=s['network_id'],
ip_version=s['ip_version'],
cidr=s['cidr'],
gateway_ip=s['gateway_ip'])
-
context.session.add(subnet)
+
+ with context.session.begin():
+ for range in ranges:
+ ip_range = models_v2.IPAllocationRange(subnet_id=subnet.id,
+ first_ip=range['first'],
+ last_ip=range['last'])
+ context.session.add(ip_range)
return self._make_subnet_dict(subnet)
def update_subnet(self, context, id, subnet):
def delete_subnet(self, context, id):
with context.session.begin():
subnet = self._get_subnet(context, id)
-
- allocations_qry = context.session.query(models_v2.IPAllocation)
- allocations_qry.filter_by(subnet_id=id).delete()
-
+ # Check if ports are using this subnet
+ allocated_qry = context.session.query(models_v2.IPAllocation)
+ allocated = allocated_qry.filter_by(port_id=id).all()
+ if allocated:
+ raise q_exc.SubnetInUse(subnet_id=id)
context.session.delete(subnet)
def get_subnet(self, context, id, fields=None, verbose=None):
raise q_exc.MacAddressInUse(net_id=p["network_id"],
mac=p['mac_address'])
+ # Returns the IP's for the port
+ ips = self._allocate_ips_for_port(context, network, port)
+
port = models_v2.Port(tenant_id=tenant_id,
network_id=p['network_id'],
mac_address=p['mac_address'],
device_id=p['device_id'])
context.session.add(port)
- # TODO(anyone) ip allocation
- #for subnet in network["subnets"]:
- # pass
+ # Update the allocated IP's
+ if ips:
+ with context.session.begin():
+ for ip in ips:
+ LOG.debug("Allocated IP %s (%s/%s/%s)", ip['ip_address'],
+ port['network_id'], ip['subnet_id'], port.id)
+ allocated = models_v2.IPAllocation(
+ network_id=port['network_id'],
+ port_id=port.id,
+ ip_address=ip['ip_address'],
+ subnet_id=ip['subnet_id'])
+ context.session.add(allocated)
return self._make_port_dict(port)
def update_port(self, context, id, port):
p = port['port']
+
with context.session.begin():
port = self._get_port(context, id)
+ # Check if the IPs need to be updated
+ if 'fixed_ips' in p:
+ original = self._make_port_dict(port)
+ ips = self._update_ips_for_port(context,
+ port["network_id"],
+ id,
+ original["fixed_ips"],
+ p['fixed_ips'])
+ # 'fixed_ip's not part of DB so it is deleted
+ del p['fixed_ips']
+
+ # Update ips if necessary
+ for ip in ips:
+ allocated = models_v2.IPAllocation(
+ network_id=port['network_id'], port_id=port.id,
+ ip_address=ip['ip_address'], subnet_id=ip['subnet_id'])
+ context.session.add(allocated)
+
port.update(p)
+
return self._make_port_dict(port)
def delete_port(self, context, id):
with context.session.begin():
port = self._get_port(context, id)
- allocations_qry = context.session.query(models_v2.IPAllocation)
- allocations_qry.filter_by(port_id=id).delete()
-
+ allocated_qry = context.session.query(models_v2.IPAllocation)
+ # recycle all of the IP's
+ # NOTE(garyk) this may be have to be addressed differently when
+ # working with a DHCP server.
+ allocated = allocated_qry.filter_by(port_id=id).all()
+ if allocated:
+ for a in allocated:
+ # Gateway address will not be recycled
+ subnet = self._get_subnet(context, a['subnet_id'])
+ if a['ip_address'] == subnet['gateway_ip']:
+ LOG.debug("Gateway address (%s/%s) is not recycled",
+ a['ip_address'], a['subnet_id'])
+ continue
+
+ QuantumDbPluginV2._recycle_ip(context,
+ network_id=a['network_id'],
+ subnet_id=a['subnet_id'],
+ ip_address=a['ip_address'],
+ port_id=id)
context.session.delete(port)
def get_port(self, context, id, fields=None, verbose=None):
return self._make_port_dict(port, fields)
def get_ports(self, context, filters=None, fields=None, verbose=None):
- return self._get_collection(context, models_v2.Port,
- self._make_port_dict,
- filters=filters, fields=fields,
- verbose=verbose)
+ fixed_ips = filters.pop('fixed_ips', [])
+ ports = self._get_collection(context, models_v2.Port,
+ self._make_port_dict,
+ filters=filters, fields=fields,
+ verbose=verbose)
+ if ports and fixed_ips:
+ filtered_ports = []
+ for port in ports:
+ if port['fixed_ips']:
+ ips = port['fixed_ips']
+ for fixed in fixed_ips:
+ found = False
+ # Convert to dictionary (deserialize)
+ fixed = eval(fixed)
+ for ip in ips:
+ if 'ip_address' in fixed and 'subnet_id' in fixed:
+ if (ip['ip_address'] == fixed['ip_address'] and
+ ip['subnet_id'] == fixed['subnet_id']):
+ found = True
+ elif 'ip_address' in fixed:
+ if ip['ip_address'] == fixed['ip_address']:
+ found = True
+ elif 'subnet_id' in fixed:
+ if ip['subnet_id'] == fixed['subnet_id']:
+ found = True
+ if found:
+ filtered_ports.append(port)
+ break
+ if found:
+ break
+ return filtered_ports
+ return ports
tenant_id = sa.Column(sa.String(255))
+class IPAllocationRange(model_base.BASEV2):
+ """Internal representation of a free IP address range in a Quantum
+ subnet. The range of available ips is [first_ip..last_ip]. The
+ allocation retrieves the first entry from the range. If the first
+ entry is equal to the last entry then this row will be deleted.
+ Recycling ips involves appending to existing ranges. This is
+ only done if the range is contiguous. If not, the first_ip will be
+ the same as the last_ip. When adjacent ips are recycled the ranges
+ will be merged.
+ """
+ subnet_id = sa.Column(sa.String(36), sa.ForeignKey('subnets.id'),
+ nullable=True)
+ first_ip = sa.Column(sa.String(64), nullable=False)
+ last_ip = sa.Column(sa.String(64), nullable=False)
+
+
class IPAllocation(model_base.BASEV2):
- """Internal representation of a IP address allocation in a Quantum
- subnet
+ """Internal representation of allocated IP addresses in a Quantum subnet.
"""
- port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id'))
- address = sa.Column(sa.String(16), nullable=False, primary_key=True)
+ port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id'),
+ nullable=False, primary_key=True)
+ ip_address = sa.Column(sa.String(64), nullable=False, primary_key=True)
subnet_id = sa.Column(sa.String(36), sa.ForeignKey('subnets.id'),
- primary_key=True)
- allocated = sa.Column(sa.Boolean(), nullable=False)
+ nullable=False, primary_key=True)
+ network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id"),
+ nullable=False, primary_key=True)
class Port(model_base.BASEV2, HasTenant):
- """Represents a port on a quantum v2 network"""
+ """Represents a port on a quantum v2 network."""
network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id"),
nullable=False)
- fixed_ips = orm.relationship(IPAllocation, backref='ports')
+ fixed_ips = orm.relationship(IPAllocation, backref='ports', lazy="dynamic")
mac_address = sa.Column(sa.String(32), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
status = sa.Column(sa.String(16), nullable=False)
class Subnet(model_base.BASEV2):
- """Represents a quantum subnet"""
+ """Represents a quantum subnet.
+
+ When a subnet is created the first and last entries will be created. These
+ are used for the IP allocation.
+ """
network_id = sa.Column(sa.String(36), sa.ForeignKey('networks.id'))
- allocations = orm.relationship(IPAllocation,
- backref=orm.backref('subnet',
- uselist=False))
ip_version = sa.Column(sa.Integer, nullable=False)
cidr = sa.Column(sa.String(64), nullable=False)
- gateway_ip = sa.Column(sa.String(255))
+ gateway_ip = sa.Column(sa.String(64))
#TODO(danwent):
# - dns_namservers
class Network(model_base.BASEV2, HasTenant):
- """Represents a v2 quantum network"""
+ """Represents a v2 quantum network."""
name = sa.Column(sa.String(255))
ports = orm.relationship(Port, backref='networks')
subnets = orm.relationship(Subnet, backref='networks')
'admin_state_up': True}}
full_input = {'port': {'admin_state_up': True,
'mac_address': router.ATTR_NOT_SPECIFIED,
- 'fixed_ips_v4': router.ATTR_NOT_SPECIFIED,
- 'fixed_ips_v6': router.ATTR_NOT_SPECIFIED,
+ 'fixed_ips': router.ATTR_NOT_SPECIFIED,
'host_routes': router.ATTR_NOT_SPECIFIED}}
full_input['port'].update(initial_input['port'])
return_value = {'id': _uuid(), 'status': 'ACTIVE',
'admin_state_up': True,
'mac_address': 'ca:fe:de:ad:be:ef',
- 'fixed_ips_v4': ['10.0.0.0/24'],
- 'fixed_ips_v6': [],
'host_routes': [],
'device_id': device_id}
return_value.update(initial_input['port'])
import logging
import mock
import os
+import random
import unittest
import quantum
network_req = self.new_create_request('networks', data, fmt)
return network_req.get_response(self.api)
- def _create_subnet(self, fmt, net_id, gateway_ip, cidr):
+ def _create_subnet(self, fmt, net_id, gateway_ip, cidr, ip_version=4):
data = {'subnet': {'network_id': net_id,
'cidr': cidr,
- 'ip_version': 4}}
+ 'ip_version': ip_version}}
if gateway_ip:
data['subnet']['gateway_ip'] = gateway_ip
content_type = 'application/' + fmt
data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}}
- for arg in ('admin_state_up', 'device_id', 'mac_address',
- 'fixed_ips_v4', 'fixed_ips_v6'):
+ for arg in ('admin_state_up', 'device_id', 'mac_address', 'fixed_ips'):
if arg in kwargs:
data['port'][arg] = kwargs[arg]
port_req = self.new_create_request('ports', data, fmt)
return port_req.get_response(self.api)
- def _make_subnet(self, fmt, network, gateway, cidr):
+ def _make_subnet(self, fmt, network, gateway, cidr, ip_version=4):
res = self._create_subnet(fmt, network['network']['id'],
- gateway, cidr)
+ gateway, cidr, ip_version)
return self.deserialize(fmt, res)
def _make_port(self, fmt, net_id, **kwargs):
port = self._make_port(fmt, net_id)
yield port
self._delete('ports', port['port']['id'])
+ else:
+ net_id = subnet['subnet']['network_id']
+ port = self._make_port(fmt, net_id)
+ yield port
+ self._delete('ports', port['port']['id'])
class TestV2HTTPResponse(QuantumDbPluginV2TestCase):
for k, v in keys:
self.assertEquals(port['port'][k], v)
self.assertTrue('mac_address' in port['port'])
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
def test_list_ports(self):
with contextlib.nested(self.port(), self.port()) as (port1, port2):
res = req.get_response(self.api)
self.assertEquals(res.status_int, 409)
+ def test_update_port_delete_ip(self):
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ data = {'port': {'admin_state_up': False,
+ 'fixed_ips': []}}
+ req = self.new_update_request('ports',
+ data, port['port']['id'])
+ res = self.deserialize('json', req.get_response(self.api))
+ self.assertEqual(res['port']['admin_state_up'],
+ data['port']['admin_state_up'])
+ self.assertEqual(res['port']['fixed_ips'],
+ data['port']['fixed_ips'])
+
+ def test_update_port_update_ip(self):
+ """Test update of port IP.
+
+ Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
+ """
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ data = {'port': {'fixed_ips': [{'subnet_id':
+ subnet['subnet']['id'],
+ 'ip_address': "10.0.0.10"}]}}
+ req = self.new_update_request('ports', data,
+ port['port']['id'])
+ res = self.deserialize('json', req.get_response(self.api))
+ ips = res['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.10')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+
+ def test_update_port_update_ips(self):
+ """Update IP and generate new IP on port.
+
+ Check a port update with the specified subnet_id's. A IP address
+ will be allocated for each subnet_id.
+ """
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ data = {'port': {'admin_state_up': False,
+ 'fixed_ips': [{'subnet_id':
+ subnet['subnet']['id']}]}}
+ req = self.new_update_request('ports', data,
+ port['port']['id'])
+ res = self.deserialize('json', req.get_response(self.api))
+ self.assertEqual(res['port']['admin_state_up'],
+ data['port']['admin_state_up'])
+ ips = res['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+
+ def test_update_port_add_additional_ip(self):
+ """Test update of port with additional IP."""
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ data = {'port': {'admin_state_up': False,
+ 'fixed_ips': [{'subnet_id':
+ subnet['subnet']['id']},
+ {'subnet_id':
+ subnet['subnet']['id']}]}}
+ req = self.new_update_request('ports', data,
+ port['port']['id'])
+ res = self.deserialize('json', req.get_response(self.api))
+ self.assertEqual(res['port']['admin_state_up'],
+ data['port']['admin_state_up'])
+ ips = res['port']['fixed_ips']
+ self.assertEquals(len(ips), 2)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ self.assertEquals(ips[1]['ip_address'], '10.0.0.3')
+ self.assertEquals(ips[1]['subnet_id'], subnet['subnet']['id'])
+
def test_requested_duplicate_mac(self):
fmt = 'json'
with self.port() as port:
res = self._create_port(fmt, net_id=net_id)
self.assertEquals(res.status_int, 503)
+ def test_requested_duplicate_ip(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ # Check configuring of duplicate IP
+ kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': ips[0]['ip_address']}]}
+ net_id = port['port']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ self.assertEquals(res.status_int, 409)
+
+ def test_requested_subnet_delete(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ req = self.new_delete_request('subnet',
+ subnet['subnet']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 404)
+
+ def test_requested_subnet_id(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ # Request a IP from specific subnet
+ kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]}
+ net_id = port['port']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ ips = port2['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.3')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+
+ def test_requested_subnet_id_v4_and_v6(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ # Get a IPv4 and IPv6 address
+ net_id = subnet['subnet']['network_id']
+ res = self._create_subnet(fmt, net_id=net_id,
+ cidr='2607:f0d0:1002:51::0/124',
+ ip_version=6, gateway_ip=None)
+ subnet2 = self.deserialize(fmt, res)
+ kwargs = {"fixed_ips":
+ [{'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet2['subnet']['id']}]}
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port3 = self.deserialize(fmt, res)
+ ips = port3['port']['fixed_ips']
+ self.assertEquals(len(ips), 2)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ self.assertEquals(ips[1]['ip_address'], '2607:f0d0:1002:51::2')
+ self.assertEquals(ips[1]['subnet_id'], subnet2['subnet']['id'])
+ res = self._create_port(fmt, net_id=net_id)
+ port3 = self.deserialize(fmt, res)
+ # Check that a v4 and a v6 address are allocated
+ ips = port3['port']['fixed_ips']
+ self.assertEquals(len(ips), 2)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.3')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ self.assertEquals(ips[1]['ip_address'], '2607:f0d0:1002:51::3')
+ self.assertEquals(ips[1]['subnet_id'], subnet2['subnet']['id'])
+
+ def test_range_allocation(self):
+ fmt = 'json'
+ with self.subnet(gateway='10.0.0.3',
+ cidr='10.0.0.0/29') as subnet:
+ kwargs = {"fixed_ips":
+ [{'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet['subnet']['id']}]}
+ net_id = subnet['subnet']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port = self.deserialize(fmt, res)
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 5)
+ alloc = ['10.0.0.1', '10.0.0.2', '10.0.0.4', '10.0.0.5',
+ '10.0.0.6']
+ for i in range(len(alloc)):
+ self.assertEquals(ips[i]['ip_address'], alloc[i])
+ self.assertEquals(ips[i]['subnet_id'],
+ subnet['subnet']['id'])
+ with self.subnet(gateway='11.0.0.6',
+ cidr='11.0.0.0/29') as subnet:
+ kwargs = {"fixed_ips":
+ [{'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet['subnet']['id']},
+ {'subnet_id': subnet['subnet']['id']}]}
+ net_id = subnet['subnet']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port = self.deserialize(fmt, res)
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 5)
+ alloc = ['11.0.0.1', '11.0.0.2', '11.0.0.3', '11.0.0.4',
+ '11.0.0.5']
+ for i in range(len(alloc)):
+ self.assertEquals(ips[i]['ip_address'], alloc[i])
+ self.assertEquals(ips[i]['subnet_id'],
+ subnet['subnet']['id'])
+
+ def test_requested_invalid_fixed_ips(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ # Test invalid subnet_id
+ kwargs = {"fixed_ips":
+ [{'subnet_id': subnet['subnet']['id']},
+ {'subnet_id':
+ '00000000-ffff-ffff-ffff-000000000000'}]}
+ net_id = port['port']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ self.assertEquals(res.status_int, 404)
+
+ # Test invalid IP address on specified subnet_id
+ kwargs = {"fixed_ips":
+ [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': '1.1.1.1'}]}
+ net_id = port['port']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ self.assertEquals(res.status_int, 400)
+
+ # Test invalid addresses - IP's not on subnet or network
+ # address or broadcast address
+ bad_ips = ['1.1.1.1', '10.0.0.0', '10.0.0.255']
+ net_id = port['port']['network_id']
+ for ip in bad_ips:
+ kwargs = {"fixed_ips": [{'ip_address': ip}]}
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ self.assertEquals(res.status_int, 400)
+
+ # Enable allocation of gateway address
+ kwargs = {"fixed_ips":
+ [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': '10.0.0.1'}]}
+ net_id = port['port']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ ips = port2['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.1')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ self._delete('ports', port2['port']['id'])
+
+ def test_requested_split(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ # Allocate specific IP
+ kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': '10.0.0.5'}]}
+ net_id = port['port']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port2 = self.deserialize(fmt, res)
+ ips = port2['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.5')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ # Allocate specific IP's
+ allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6']
+ for a in allocated:
+ res = self._create_port(fmt, net_id=net_id)
+ port2 = self.deserialize(fmt, res)
+ ips = port2['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], a)
+ self.assertEquals(ips[0]['subnet_id'],
+ subnet['subnet']['id'])
+
+ def test_requested_ips_only(self):
+ fmt = 'json'
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21',
+ '10.0.0.3', '10.0.0.17', '10.0.0.19']
+ for i in ips_only:
+ kwargs = {"fixed_ips": [{'ip_address': i}]}
+ net_id = port['port']['network_id']
+ res = self._create_port(fmt, net_id=net_id, **kwargs)
+ port = self.deserialize(fmt, res)
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], i)
+ self.assertEquals(ips[0]['subnet_id'],
+ subnet['subnet']['id'])
+
+ def test_recycling(self):
+ fmt = 'json'
+ with self.subnet(cidr='10.0.1.0/24') as subnet:
+ with self.port(subnet=subnet) as port:
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.1.2')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+ net_id = port['port']['network_id']
+ ports = []
+ for i in range(16 - 3):
+ res = self._create_port(fmt, net_id=net_id)
+ p = self.deserialize(fmt, res)
+ ports.append(p)
+ for i in range(16 - 3):
+ x = random.randrange(0, len(ports), 1)
+ p = ports.pop(x)
+ self._delete('ports', p['port']['id'])
+ res = self._create_port(fmt, net_id=net_id)
+ port = self.deserialize(fmt, res)
+ ips = port['port']['fixed_ips']
+ self.assertEquals(len(ips), 1)
+ self.assertEquals(ips[0]['ip_address'], '10.0.1.3')
+ self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id'])
+
class TestNetworksV2(QuantumDbPluginV2TestCase):
# NOTE(cerberus): successful network update and delete are