except exc.NoResultFound:
return []
+ def _get_subnets_by_network(self, context, network_id):
+ try:
+ subnet_qry = context.session.query(models_v2.Subnet)
+ return subnet_qry.filter_by(network_id=network_id).all()
+ except exc.NoResultFound:
+ return []
+
def _fields(self, resource, fields):
if fields:
return dict(((key, item) for key, item in resource.iteritems()
if 'shared' in n:
self._validate_shared_update(context, id, network, n)
network.update(n)
+ # also update shared in all the subnets for this network
+ subnets = self._get_subnets_by_network(context, id)
+ for subnet in subnets:
+ subnet['shared'] = network['shared']
return self._make_network_dict(network)
def delete_network(self, context, id):
with context.session.begin(subtransactions=True):
network = self._get_network(context, s["network_id"])
self._validate_subnet_cidr(network, s['cidr'])
+ # The 'shared' attribute for subnets is for internal plugin
+ # use only. It is not exposed through the API
subnet = models_v2.Subnet(tenant_id=tenant_id,
id=s.get('id') or utils.str_uuid(),
name=s['name'],
ip_version=s['ip_version'],
cidr=s['cidr'],
enable_dhcp=s['enable_dhcp'],
- gateway_ip=s['gateway_ip'])
+ gateway_ip=s['gateway_ip'],
+ shared=network.shared)
# perform allocate pools first, since it might raise an error
pools = self._allocate_pools_for_subnet(context, s)
for arg in ('allocation_pools',
'ip_version', 'tenant_id',
'enable_dhcp', 'allocation_pools',
- 'dns_nameservers', 'host_routes'):
+ 'dns_nameservers', 'host_routes',
+ 'shared'):
# Arg must be present and not null (but can be false)
if arg in kwargs and kwargs[arg] is not None:
data['subnet'][arg] = kwargs[arg]
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True,
- dns_nameservers=None, host_routes=None):
+ dns_nameservers=None, host_routes=None, shared=None):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
ip_version=ip_version,
enable_dhcp=enable_dhcp,
dns_nameservers=dns_nameservers,
- host_routes=host_routes)
+ host_routes=host_routes,
+ shared=shared)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= 400:
allocation_pools=None,
enable_dhcp=True,
dns_nameservers=None,
- host_routes=None):
+ host_routes=None,
+ shared=None):
# TODO(anyone) DRY this
# NOTE(salvatore-orlando): we can pass the network object
# to gen function anyway, and then avoid the repetition
ip_version,
enable_dhcp,
dns_nameservers,
- host_routes)
+ host_routes,
+ shared=shared)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
else:
ip_version,
enable_dhcp,
dns_nameservers,
- host_routes)
+ host_routes,
+ shared=shared)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
self.assertTrue('mac_address' in port['port'])
self._delete('ports', port['port']['id'])
+ def test_create_port_public_network_with_ip(self):
+ with self.network(shared=True) as network:
+ with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
+ keys = [('admin_state_up', True), ('status', 'ACTIVE'),
+ ('fixed_ips', [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': '10.0.0.2'}])]
+ port_res = self._create_port('json',
+ network['network']['id'],
+ 201,
+ tenant_id='another_tenant',
+ set_context=True)
+ port = self.deserialize('json', port_res)
+ for k, v in keys:
+ self.assertEquals(port['port'][k], v)
+ self.assertTrue('mac_address' in port['port'])
+ self._delete('ports', port['port']['id'])
+
def test_create_ports_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
res = self.deserialize('json', req.get_response(self.api))
self.assertTrue(res['network']['shared'])
+ def test_update_network_with_subnet_set_shared(self):
+ with self.network(shared=False) as network:
+ with self.subnet(network=network) as subnet:
+ data = {'network': {'shared': True}}
+ req = self.new_update_request('networks',
+ data,
+ network['network']['id'])
+ res = self.deserialize('json', req.get_response(self.api))
+ self.assertTrue(res['network']['shared'])
+ # must query db to see whether subnet's shared attribute
+ # has been updated or not
+ ctx = context.Context('', '', is_admin=True)
+ subnet_db = QuantumManager.get_plugin()._get_subnet(
+ ctx, subnet['subnet']['id'])
+ self.assertEqual(subnet_db['shared'], True)
+
def test_update_network_set_not_shared_single_tenant(self):
with self.network(shared=True) as network:
res1 = self._create_port('json',
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 400)
+ def test_create_subnet_shared_returns_422(self):
+ cidr = '10.0.0.0/24'
+ with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ self._test_create_subnet(cidr=cidr,
+ shared=True)
+ self.assertEquals(ctx_manager.exception.code, 422)
+
def test_update_subnet(self):
with self.subnet() as subnet:
data = {'subnet': {'gateway_ip': '11.0.0.1'}}
self.assertEqual(res['subnet']['gateway_ip'],
data['subnet']['gateway_ip'])
+ def test_update_subnet_shared_returns_422(self):
+ with self.network(shared=True) as network:
+ with self.subnet(network=network) as subnet:
+ data = {'subnet': {'shared': True}}
+ req = self.new_update_request('subnets', data,
+ subnet['subnet']['id'])
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int, 422)
+
def test_show_subnet(self):
with self.network() as network:
with self.subnet(network=network) as subnet: