self.syncmanager = syncmanager.SyncManager(self.nuageclient)
self._synchronization_thread()
+ db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
+ attributes.NETWORKS, ['_extend_network_dict_provider_nuage'])
+
def nuageclient_init(self):
server = cfg.CONF.RESTPROXY.server
serverauth = cfg.CONF.RESTPROXY.serverauth
subnets = self.get_subnets(context, filters=filters)
return bool(routers or subnets)
- def _extend_network_dict_provider(self, context, network):
- binding = nuagedb.get_network_binding(context.session, network['id'])
+ def _extend_network_dict_provider_nuage(self, network, net_db,
+ net_binding=None):
+ binding = net_db.pnetbinding if net_db else net_binding
if binding:
network[pnet.NETWORK_TYPE] = binding.network_type
network[pnet.PHYSICAL_NETWORK] = binding.physical_network
return network_type, physical_network, segmentation_id
def create_network(self, context, network):
+ binding = None
(network_type, physical_network,
vlan_id) = self._process_provider_create(context,
network['network'])
network)
self._process_l3_create(context, net, network['network'])
if network_type == 'vlan':
- nuagedb.add_network_binding(context.session, net['id'],
+ binding = nuagedb.add_network_binding(context.session,
+ net['id'],
network_type,
physical_network, vlan_id)
- self._extend_network_dict_provider(context, net)
+ self._extend_network_dict_provider_nuage(net, None, binding)
return net
def _validate_update_network(self, context, id, network):
raise n_exc.NetworkInUse(net_id=id)
return (is_external_set, subnet)
- def get_network(self, context, net_id, fields=None):
- net = super(NuagePlugin, self).get_network(context,
- net_id,
- None)
- self._extend_network_dict_provider(context, net)
- return self._fields(net, fields)
-
- def get_networks(self, context, filters=None, fields=None,
- sorts=None, limit=None, marker=None, page_reverse=False):
- nets = super(NuagePlugin,
- self).get_networks(context, filters, None, sorts,
- limit, marker, page_reverse)
- for net in nets:
- self._extend_network_dict_provider(context, net)
-
- return [self._fields(net, fields) for net in nets]
-
def update_network(self, context, id, network):
pnet._raise_if_updates_provider_attributes(network['network'])
with context.session.begin(subtransactions=True):
class TestNuageProviderNetTestCase(NuagePluginV2TestCase):
def test_create_provider_network(self):
- phy_net = uuidutils.generate_uuid()
+ phys_net = uuidutils.generate_uuid()
data = {'network': {'name': 'pnet1',
'tenant_id': 'admin',
pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: phy_net,
+ pnet.PHYSICAL_NETWORK: phys_net,
pnet.SEGMENTATION_ID: 123}}
network_req = self.new_create_request('networks', data, self.fmt)
net = self.deserialize(self.fmt, network_req.get_response(self.api))
self.assertEqual('vlan', net['network'][pnet.NETWORK_TYPE])
- self.assertEqual(phy_net, net['network'][pnet.PHYSICAL_NETWORK])
+ self.assertEqual(phys_net, net['network'][pnet.PHYSICAL_NETWORK])
self.assertEqual(123, net['network'][pnet.SEGMENTATION_ID])
def test_create_provider_network_no_admin(self):
- phy_net = uuidutils.generate_uuid()
+ phys_net = uuidutils.generate_uuid()
data = {'network': {'name': 'pnet1',
'tenant_id': 'no_admin',
pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: phy_net,
+ pnet.PHYSICAL_NETWORK: phys_net,
pnet.SEGMENTATION_ID: 123}}
network_req = self.new_create_request('networks', data, self.fmt)
network_req.environ['neutron.context'] = context.Context(
res = network_req.get_response(self.api)
self.assertEqual(exc.HTTPForbidden.code, res.status_int)
+ def test_get_network_for_provider_network(self):
+ phys_net = uuidutils.generate_uuid()
+ data = {'network': {'name': 'pnet1',
+ 'tenant_id': 'admin',
+ pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: phys_net,
+ pnet.SEGMENTATION_ID: 123}}
+ network_req = self.new_create_request('networks', data, self.fmt)
+ res = self.deserialize(self.fmt, network_req.get_response(self.api))
+
+ get_req = self.new_show_request('networks', res['network']['id'])
+ net = self.deserialize(self.fmt, get_req.get_response(self.api))
+ self.assertEqual('vlan', net['network'][pnet.NETWORK_TYPE])
+ self.assertEqual(phys_net, net['network'][pnet.PHYSICAL_NETWORK])
+ self.assertEqual(123, net['network'][pnet.SEGMENTATION_ID])
+
+ def test_list_networks_for_provider_network(self):
+ phys_net = uuidutils.generate_uuid()
+ data1 = {'network': {'name': 'pnet1',
+ 'tenant_id': 'admin',
+ pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: phys_net,
+ pnet.SEGMENTATION_ID: 123}}
+ network_req_1 = self.new_create_request('networks', data1, self.fmt)
+ network_req_1.get_response(self.api)
+ data2 = {'network': {'name': 'pnet2',
+ 'tenant_id': 'admin',
+ pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: phys_net,
+ pnet.SEGMENTATION_ID: 234}}
+ network_req_2 = self.new_create_request('networks', data2, self.fmt)
+ network_req_2.get_response(self.api)
+
+ list_req = self.new_list_request('networks')
+ pnets = self.deserialize(self.fmt, list_req.get_response(self.api))
+ self.assertEqual(2, len(pnets['networks']))
+ self.assertEqual('vlan', pnets['networks'][0][pnet.NETWORK_TYPE])
+ self.assertEqual(phys_net, pnets['networks'][0][pnet.PHYSICAL_NETWORK])
+ self.assertEqual(123, pnets['networks'][0][pnet.SEGMENTATION_ID])
+ self.assertEqual('vlan', pnets['networks'][1][pnet.NETWORK_TYPE])
+ self.assertEqual(phys_net, pnets['networks'][1][pnet.PHYSICAL_NETWORK])
+ self.assertEqual(234, pnets['networks'][1][pnet.SEGMENTATION_ID])
+
class TestNuageSecurityGroupTestCase(NuagePluginV2TestCase,
test_sg.TestSecurityGroups):