main_ls = [ls for ls in lswitches if ls['uuid'] == network.id]
tag_dict = dict((x['scope'], x['tag']) for x in main_ls[0]['tags'])
if 'multi_lswitch' not in tag_dict:
+ tags = main_ls[0]['tags']
+ tags.append({'tag': 'True', 'scope': 'multi_lswitch'})
nvplib.update_lswitch(cluster,
main_ls[0]['uuid'],
main_ls[0]['display_name'],
network['tenant_id'],
- tags=[{'tag': 'True',
- 'scope': 'multi_lswitch'}])
+ tags=tags)
selected_lswitch = nvplib.create_lswitch(
cluster, network.tenant_id,
"%s-ext-%s" % (network.name, len(lswitches)),
target_cluster, tenant_id, net_data.get('name'),
nvp_binding_type,
net_data.get(pnet.PHYSICAL_NETWORK),
- net_data.get(pnet.SEGMENTATION_ID))
+ net_data.get(pnet.SEGMENTATION_ID),
+ shared=net_data.get(attr.SHARED))
net_data['id'] = lswitch['uuid']
with context.session.begin(subtransactions=True):
else:
tenant_ids = tenant_ids or [context.tenant_id]
tenant_filter = ''.join(filter_fmt % tid for tid in tenant_ids)
-
lswitch_filters = "uuid,display_name,fabric_status,tags"
- lswitch_url_path = (
+ lswitch_url_path_1 = (
"/ws.v1/lswitch?fields=%s&relations=LogicalSwitchStatus%s"
% (lswitch_filters, tenant_filter))
+ lswitch_url_path_2 = nvplib._build_uri_path(
+ nvplib.LSWITCH_RESOURCE,
+ fields=lswitch_filters,
+ relations='LogicalSwitchStatus',
+ filters={'tag': 'true', 'tag_scope': 'shared'})
try:
for c in self.clusters.itervalues():
res = nvplib.get_all_query_pages(
- lswitch_url_path, c)
+ lswitch_url_path_1, c)
nvp_lswitches.update(dict(
(ls['uuid'], ls) for ls in res))
+ # Issue a second query for fetching shared networks.
+ # We cannot unfortunately use just a single query because tags
+ # cannot be or-ed
+ res_shared = nvplib.get_all_query_pages(
+ lswitch_url_path_2, c)
+ nvp_lswitches.update(dict(
+ (ls['uuid'], ls) for ls in res_shared))
except Exception:
err_msg = _("Unable to get logical switches")
LOG.exception(err_msg)
transport_zone_uuid=None,
vlan_id=None,
quantum_net_id=None,
+ shared=None,
**kwargs):
nvp_binding_type = transport_type
if transport_type in ('flat', 'vlan'):
if quantum_net_id:
lswitch_obj["tags"].append({"tag": quantum_net_id,
"scope": "quantum_net_id"})
+ if shared:
+ lswitch_obj["tags"].append({"tag": "true",
+ "scope": "shared"})
if "tags" in kwargs:
lswitch_obj["tags"].extend(kwargs["tags"])
uri = _build_uri_path(LSWITCH_RESOURCE)
self._test_list_resources('network', [net1, net2],
query_params=query_params)
+ def test_delete_network_after_removing_subet(self):
+ gateway_ip = '10.0.0.1'
+ cidr = '10.0.0.0/24'
+ fmt = 'json'
+ # Create new network
+ res = self._create_network(fmt=fmt, name='net',
+ admin_state_up=True)
+ network = self.deserialize(fmt, res)
+ subnet = self._make_subnet(fmt, network, gateway_ip,
+ cidr, ip_version=4)
+ req = self.new_delete_request('subnets', subnet['subnet']['id'])
+ sub_del_res = req.get_response(self.api)
+ self.assertEqual(sub_del_res.status_int, 204)
+ req = self.new_delete_request('networks', network['network']['id'])
+ net_del_res = req.get_response(self.api)
+ self.assertEqual(net_del_res.status_int, 204)
+
+ def test_list_networks_with_shared(self):
+ with self.network(name='net1') as net1:
+ with self.network(name='net2', shared=True) as net2:
+ req = self.new_list_request('networks')
+ res = self.deserialize('json', req.get_response(self.api))
+ self.assertEqual(len(res['networks']), 2)
+ req_2 = self.new_list_request('networks')
+ req_2.environ['quantum.context'] = context.Context('',
+ 'somebody')
+ res = self.deserialize('json', req_2.get_response(self.api))
+ # tenant must see a single network
+ self.assertEqual(len(res['networks']), 1)
+
class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase):