# This external port is never exposed to the tenant.
# it is used purely for internal system and admin use when
# managing floating IPs.
- external_port = self._core_plugin.create_port(context.elevated(), {
- 'port':
- {'tenant_id': '', # tenant intentionally not set
- 'network_id': f_net_id,
- 'mac_address': attributes.ATTR_NOT_SPECIFIED,
- 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
- 'admin_state_up': True,
- 'device_id': fip_id,
- 'device_owner': DEVICE_OWNER_FLOATINGIP,
- 'name': ''}})
+
+ port = {'tenant_id': '', # tenant intentionally not set
+ 'network_id': f_net_id,
+ 'mac_address': attributes.ATTR_NOT_SPECIFIED,
+ 'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
+ 'admin_state_up': True,
+ 'device_id': fip_id,
+ 'device_owner': DEVICE_OWNER_FLOATINGIP,
+ 'name': ''}
+
+ if fip.get('floating_ip_address'):
+ port['fixed_ips'] = [
+ {'ip_address': fip['floating_ip_address']}]
+
+ external_port = self._core_plugin.create_port(context.elevated(),
+ {'port': port})
+
# Ensure IP addresses are allocated on external port
if not external_port['fixed_ips']:
raise n_exc.ExternalIpAddressExhausted(net_id=f_net_id)
{'network': {external_net.EXTERNAL: True}})
def _create_floatingip(self, fmt, network_id, port_id=None,
- fixed_ip=None, set_context=False):
+ fixed_ip=None, set_context=False,
+ floating_ip=None):
data = {'floatingip': {'floating_network_id': network_id,
'tenant_id': self._tenant_id}}
if port_id:
data['floatingip']['port_id'] = port_id
if fixed_ip:
data['floatingip']['fixed_ip_address'] = fixed_ip
+
+ if floating_ip:
+ data['floatingip']['floating_ip_address'] = floating_ip
+
floatingip_req = self.new_create_request('floatingips', data, fmt)
if set_context and self._tenant_id:
# create a specific auth context for this request
return floatingip_req.get_response(self.ext_api)
def _make_floatingip(self, fmt, network_id, port_id=None,
- fixed_ip=None, set_context=False):
+ fixed_ip=None, set_context=False, floating_ip=None,
+ http_status=exc.HTTPCreated.code):
res = self._create_floatingip(fmt, network_id, port_id,
- fixed_ip, set_context)
- self.assertEqual(res.status_int, exc.HTTPCreated.code)
+ fixed_ip, set_context, floating_ip)
+ self.assertEqual(res.status_int, http_status)
return self.deserialize(fmt, res)
def _validate_floating_ip(self, fip):
with self.subnet(network=net):
self._make_floatingip(self.fmt, net_id)
+ def test_create_floatingip_with_specific_ip(self):
+ with self.subnet(cidr='10.0.0.0/24') as s:
+ network_id = s['subnet']['network_id']
+ self._set_net_external(network_id)
+ fp = self._make_floatingip(self.fmt, network_id,
+ floating_ip='10.0.0.10')
+ try:
+ self.assertEqual(fp['floatingip']['floating_ip_address'],
+ '10.0.0.10')
+ finally:
+ self._delete('floatingips', fp['floatingip']['id'])
+
+ def test_create_floatingip_with_specific_ip_out_of_allocation(self):
+ with self.subnet(cidr='10.0.0.0/24',
+ allocation_pools=[
+ {'start': '10.0.0.10', 'end': '10.0.0.20'}]
+ ) as s:
+ network_id = s['subnet']['network_id']
+ self._set_net_external(network_id)
+ fp = self._make_floatingip(self.fmt, network_id,
+ floating_ip='10.0.0.30')
+ try:
+ self.assertEqual(fp['floatingip']['floating_ip_address'],
+ '10.0.0.30')
+ finally:
+ self._delete('floatingips', fp['floatingip']['id'])
+
+ def test_create_floatingip_with_specific_ip_non_admin(self):
+ ctx = context.Context('user_id', 'tenant_id')
+
+ with self.subnet(cidr='10.0.0.0/24') as s:
+ network_id = s['subnet']['network_id']
+ self._set_net_external(network_id)
+ self._make_floatingip(self.fmt, network_id,
+ set_context=ctx,
+ floating_ip='10.0.0.10',
+ http_status=exc.HTTPForbidden.code)
+
+ def test_create_floatingip_with_specific_ip_out_of_subnet(self):
+
+ with self.subnet(cidr='10.0.0.0/24') as s:
+ network_id = s['subnet']['network_id']
+ self._set_net_external(network_id)
+ self._make_floatingip(self.fmt, network_id,
+ floating_ip='10.0.1.10',
+ http_status=exc.HTTPBadRequest.code)
+
+ def test_create_floatingip_with_duplicated_specific_ip(self):
+
+ with self.subnet(cidr='10.0.0.0/24') as s:
+ network_id = s['subnet']['network_id']
+ self._set_net_external(network_id)
+ fp1 = self._make_floatingip(self.fmt, network_id,
+ floating_ip='10.0.0.10')
+
+ try:
+ self._make_floatingip(self.fmt, network_id,
+ floating_ip='10.0.0.10',
+ http_status=exc.HTTPConflict.code)
+ finally:
+ self._delete('floatingips', fp1['floatingip']['id'])
+
class L3AgentDbTestCaseBase(L3NatTestCaseMixin):