import neutron.plugins.nicira.api_client.client_eventlet as client
from neutron.plugins.nicira import extensions
-import neutron.plugins.nicira.NvpApiClient as nvpapi
+import neutron.plugins.nicira.NvpApiClient as nsxapi
from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira.vshield.common import VcnsApiClient as vcnsapi
from neutron.plugins.nicira.vshield import vcns
plugin = neutron_plugin.NsxPlugin
service_plugin = neutron_plugin.NsxServicePlugin
-api_helper = nvpapi.NVPApiHelper
+api_helper = nsxapi.NVPApiHelper
api_client = client.NvpApiClientEventlet
vcns_class = vcns.Vcns
vcns_driver = vcnsdriver.VcnsDriver
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2012 Nicira Networks, Inc.
+# Copyright 2012 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
-from neutron.plugins.nicira import NvpApiClient
+from neutron.plugins.nicira import NvpApiClient as api_client
LOG = logging.getLogger(__name__)
'gateway_ip_address', '0.0.0.0')
else:
fake_lrouter['default_next_hop'] = '0.0.0.0'
- # NOTE(salv-orlando): We won't make the Fake NVP API client
- # aware of NVP version. The long term plan is to replace it
- # with behavioral mocking of NVP API requests
+ # NOTE(salv-orlando): We won't make the Fake NSX API client
+ # aware of NSX version. The long term plan is to replace it
+ # with behavioral mocking of NSX API requests
if 'distributed' not in fake_lrouter:
fake_lrouter['distributed'] = False
distributed_json = ('"distributed": %s,' %
try:
fake_lrouter = self._fake_lrouter_dict[lr_uuid]
except KeyError:
- raise NvpApiClient.ResourceNotFound()
+ raise api_client.ResourceNotFound()
fake_lrouter['lport_count'] += 1
fake_lport_status = fake_lport.copy()
fake_lport_status['lr_tenant_id'] = fake_lrouter['tenant_id']
for res_uuid in res_dict if res_uuid == target_uuid]
if items:
return json.dumps(items[0])
- raise NvpApiClient.ResourceNotFound()
+ raise api_client.ResourceNotFound()
def handle_get(self, url):
#TODO(salvatore-orlando): handle field selection
relations = urlparse.parse_qs(parsedurl.query).get('relations')
response_file = self.FAKE_GET_RESPONSES.get(res_type)
if not response_file:
- raise NvpApiClient.NvpApiException()
+ raise api_client.NvpApiException()
if 'lport' in res_type or 'nat' in res_type:
if len(uuids) > 1:
return self._show(res_type, response_file, uuids[0],
try:
resource = res_dict[uuids[-1]]
except KeyError:
- raise NvpApiClient.ResourceNotFound()
+ raise api_client.ResourceNotFound()
if not is_attachment:
edit_resource = getattr(self, '_build_%s' % res_type, None)
if edit_resource:
try:
del res_dict[uuids[-1]]
except KeyError:
- raise NvpApiClient.ResourceNotFound()
+ raise api_client.ResourceNotFound()
return ""
def fake_request(self, *args, **kwargs):
from neutron.tests import base
-class NvpApiCommonTest(base.BaseTestCase):
+class ApiCommonTest(base.BaseTestCase):
def test_conn_str(self):
conn = httplib.HTTPSConnection('localhost', 4242, timeout=0)
return urllib2.urlopen(url).read()
-class NvpApiRequestEventletTest(base.BaseTestCase):
+class ApiRequestEventletTest(base.BaseTestCase):
def setUp(self):
- super(NvpApiRequestEventletTest, self).setUp()
+ super(ApiRequestEventletTest, self).setUp()
self.client = nace.NvpApiClientEventlet(
[("127.0.0.1", 4401, True)], "admin", "admin")
self.url = "/ws.v1/_debug"
def tearDown(self):
self.client = None
self.req = None
- super(NvpApiRequestEventletTest, self).tearDown()
+ super(ApiRequestEventletTest, self).tearDown()
def test_construct_eventlet_api_request(self):
e = nare.NvpApiRequestEventlet(self.client, self.url)
from neutron.db import api as db
from neutron.db import models_v2
from neutron.openstack.common.db import exception as d_exc
-from neutron.plugins.nicira.dbexts import nicira_db
-from neutron.plugins.nicira.dbexts import nicira_models
+from neutron.plugins.nicira.dbexts import nicira_db as nsx_db
+from neutron.plugins.nicira.dbexts import nicira_models as models
from neutron.tests import base
-class NiciraDBTestCase(base.BaseTestCase):
+class NsxDBTestCase(base.BaseTestCase):
def setUp(self):
- super(NiciraDBTestCase, self).setUp()
+ super(NsxDBTestCase, self).setUp()
db.configure_db()
self.ctx = context.get_admin_context()
self.addCleanup(db.clear_db)
nsx_switch_id = 'foo_nsx_switch_id'
self._setup_neutron_network_and_port(neutron_net_id, neutron_port_id)
- nicira_db.add_neutron_nsx_port_mapping(
+ nsx_db.add_neutron_nsx_port_mapping(
self.ctx.session, neutron_port_id, nsx_switch_id, nsx_port_id)
# Call the method twice to trigger a db duplicate constraint error
- nicira_db.add_neutron_nsx_port_mapping(
+ nsx_db.add_neutron_nsx_port_mapping(
self.ctx.session, neutron_port_id, nsx_switch_id, nsx_port_id)
- result = (self.ctx.session.query(nicira_models.NeutronNsxPortMapping).
+ result = (self.ctx.session.query(models.NeutronNsxPortMapping).
filter_by(neutron_id=neutron_port_id).one())
self.assertEqual(nsx_port_id, result.nsx_port_id)
self.assertEqual(neutron_port_id, result.neutron_id)
nsx_switch_id = 'foo_nsx_switch_id'
self._setup_neutron_network_and_port(neutron_net_id, neutron_port_id)
- nicira_db.add_neutron_nsx_port_mapping(
+ nsx_db.add_neutron_nsx_port_mapping(
self.ctx.session, neutron_port_id, nsx_switch_id, nsx_port_id_1)
# Call the method twice to trigger a db duplicate constraint error,
# this time with a different nsx port id!
self.assertRaises(d_exc.DBDuplicateEntry,
- nicira_db.add_neutron_nsx_port_mapping,
+ nsx_db.add_neutron_nsx_port_mapping,
self.ctx.session, neutron_port_id,
nsx_switch_id, nsx_port_id_2)
nsx_port_id = 'foo_nsx_port_id'
nsx_switch_id = 'foo_nsx_switch_id'
self.assertRaises(d_exc.DBError,
- nicira_db.add_neutron_nsx_port_mapping,
+ nsx_db.add_neutron_nsx_port_mapping,
self.ctx.session, neutron_port_id,
nsx_switch_id, nsx_port_id)
--- /dev/null
+# Copyright (c) 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.tests.unit import test_extension_allowedaddresspairs as ext_pairs
+from neutron.tests.unit.vmware import test_nsx_plugin
+
+
+class TestAllowedAddressPairs(test_nsx_plugin.NsxPluginV2TestCase,
+ ext_pairs.TestAllowedAddressPairs):
+ pass
from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira.NvpApiClient import NVPVersion
from neutron.tests.unit import test_db_plugin
-from neutron.tests.unit.vmware import fake_nvpapiclient
+from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import NSXEXT_PATH
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
ext_mgr = MacLearningExtensionManager()
- # mock nvp api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
- self.mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
- instance = self.mock_nvpapi.start()
+ # mock api client
+ self.fc = fake.FakeClient(STUBS_PATH)
+ self.mock_nsx = mock.patch(NSXAPI_NAME, autospec=True)
+ instance = self.mock_nsx.start()
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
- def _fake_request(*args, **kwargs):
- return self.fc.fake_request(*args, **kwargs)
-
- # Emulate tests against NVP 2.x
+ # Emulate tests against NSX 2.x
instance.return_value.get_nvp_version.return_value = NVPVersion("3.0")
- instance.return_value.request.side_effect = _fake_request
+ instance.return_value.request.side_effect = self.fc.fake_request
cfg.CONF.set_override('metadata_mode', None, 'NSX')
self.addCleanup(self.fc.reset_all)
- self.addCleanup(self.mock_nvpapi.stop)
+ self.addCleanup(self.mock_nsx.stop)
self.addCleanup(patch_sync.stop)
self.addCleanup(self.restore_resource_attribute_map)
self.addCleanup(cfg.CONF.reset)
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# under the License.
import contextlib
-
import mock
+
from oslo.config import cfg
from webob import exc
import webtest
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron import manager
+from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.dbexts import networkgw_db
from neutron.plugins.nicira.extensions import networkgw
-from neutron.plugins.nicira.NeutronPlugin import NVP_EXT_PATH
+from neutron.plugins.nicira import nsxlib
+from neutron.plugins.nicira import NvpApiClient
from neutron import quota
from neutron.tests import base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_extensions
-
+from neutron.tests.unit.vmware import NSXEXT_PATH
+from neutron.tests.unit.vmware import PLUGIN_NAME
+from neutron.tests.unit.vmware.test_nsx_plugin import NsxPluginV2TestCase
_uuid = test_api_v2._uuid
_get_path = test_api_v2._get_path
'vlan', 555)
+class TestNetworkGateway(NsxPluginV2TestCase,
+ NetworkGatewayDbTestCase):
+
+ def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
+ cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
+ super(TestNetworkGateway,
+ self).setUp(plugin=plugin, ext_mgr=ext_mgr)
+
+ def test_create_network_gateway_name_exceeds_40_chars(self):
+ name = 'this_is_a_gateway_whose_name_is_longer_than_40_chars'
+ with self._network_gateway(name=name) as nw_gw:
+ # Assert Neutron name is not truncated
+ self.assertEqual(nw_gw[self.resource]['name'], name)
+
+ def test_update_network_gateway_with_name_calls_backend(self):
+ with mock.patch.object(
+ nsxlib.l2gateway, 'update_l2_gw_service') as mock_update_gw:
+ with self._network_gateway(name='cavani') as nw_gw:
+ nw_gw_id = nw_gw[self.resource]['id']
+ self._update(networkgw.COLLECTION_NAME, nw_gw_id,
+ {self.resource: {'name': 'higuain'}})
+ mock_update_gw.assert_called_once_with(
+ mock.ANY, nw_gw_id, 'higuain')
+
+ def test_update_network_gateway_without_name_does_not_call_backend(self):
+ with mock.patch.object(
+ nsxlib.l2gateway, 'update_l2_gw_service') as mock_update_gw:
+ with self._network_gateway(name='something') as nw_gw:
+ nw_gw_id = nw_gw[self.resource]['id']
+ self._update(networkgw.COLLECTION_NAME, nw_gw_id,
+ {self.resource: {}})
+ self.assertEqual(mock_update_gw.call_count, 0)
+
+ def test_update_network_gateway_name_exceeds_40_chars(self):
+ new_name = 'this_is_a_gateway_whose_name_is_longer_than_40_chars'
+ with self._network_gateway(name='something') as nw_gw:
+ nw_gw_id = nw_gw[self.resource]['id']
+ self._update(networkgw.COLLECTION_NAME, nw_gw_id,
+ {self.resource: {'name': new_name}})
+ req = self.new_show_request(networkgw.COLLECTION_NAME,
+ nw_gw_id)
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ # Assert Neutron name is not truncated
+ self.assertEqual(new_name, res[self.resource]['name'])
+ # Assert NSX name is truncated
+ self.assertEqual(
+ new_name[:40],
+ self.fc._fake_gatewayservice_dict[nw_gw_id]['display_name'])
+
+ def test_create_network_gateway_nsx_error_returns_500(self):
+ def raise_nsx_api_exc(*args, **kwargs):
+ raise NvpApiClient.NvpApiException
+
+ with mock.patch.object(nsxlib.l2gateway,
+ 'create_l2_gw_service',
+ new=raise_nsx_api_exc):
+ res = self._create_network_gateway(
+ self.fmt, 'xxx', name='yyy',
+ devices=[{'id': uuidutils.generate_uuid()}])
+ self.assertEqual(500, res.status_int)
+
+ def test_create_network_gateway_nsx_error_returns_409(self):
+ with mock.patch.object(nsxlib.l2gateway,
+ 'create_l2_gw_service',
+ side_effect=NvpApiClient.Conflict):
+ res = self._create_network_gateway(
+ self.fmt, 'xxx', name='yyy',
+ devices=[{'id': uuidutils.generate_uuid()}])
+ self.assertEqual(409, res.status_int)
+
+ def test_list_network_gateways(self):
+ with self._network_gateway(name='test-gw-1') as gw1:
+ with self._network_gateway(name='test_gw_2') as gw2:
+ req = self.new_list_request(networkgw.COLLECTION_NAME)
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ # We expect the default gateway too
+ key = self.resource + 's'
+ self.assertEqual(len(res[key]), 3)
+ self.assertEqual(res[key][0]['default'],
+ True)
+ self.assertEqual(res[key][1]['name'],
+ gw1[self.resource]['name'])
+ self.assertEqual(res[key][2]['name'],
+ gw2[self.resource]['name'])
+
+ def test_list_network_gateway_with_multiple_connections(self):
+ self._test_list_network_gateway_with_multiple_connections(
+ expected_gateways=2)
+
+ def test_delete_network_gateway(self):
+ # The default gateway must still be there
+ self._test_delete_network_gateway(1)
+
+ def test_show_network_gateway_nsx_error_returns_404(self):
+ invalid_id = 'b5afd4a9-eb71-4af7-a082-8fc625a35b61'
+ req = self.new_show_request(networkgw.COLLECTION_NAME, invalid_id)
+ res = req.get_response(self.ext_api)
+ self.assertEqual(exc.HTTPNotFound.code, res.status_int)
+
+
class TestNetworkGatewayPlugin(db_base_plugin_v2.NeutronDbPluginV2,
networkgw_db.NetworkGatewayMixin):
"""Simple plugin class for testing db support for network gateway ext."""
def __init__(self, **args):
super(TestNetworkGatewayPlugin, self).__init__(**args)
- extensions.append_api_extensions_path([NVP_EXT_PATH])
+ extensions.append_api_extensions_path([NSXEXT_PATH])
def delete_port(self, context, id, nw_gw_port_check=True):
if nw_gw_port_check:
--- /dev/null
+# Copyright (c) 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.common import test_lib
+from neutron.plugins.nicira.common import sync
+from neutron.tests.unit import test_extension_portsecurity as psec
+from neutron.tests.unit.vmware.apiclient import fake
+from neutron.tests.unit.vmware import get_fake_conf
+from neutron.tests.unit.vmware import NSXAPI_NAME
+from neutron.tests.unit.vmware import PLUGIN_NAME
+from neutron.tests.unit.vmware import STUBS_PATH
+
+
+class PortSecurityTestCase(psec.PortSecurityDBTestCase):
+
+ def setUp(self):
+ test_lib.test_config['config_files'] = [get_fake_conf('nsx.ini.test')]
+ # mock api client
+ self.fc = fake.FakeClient(STUBS_PATH)
+ self.mock_nsx = mock.patch(NSXAPI_NAME, autospec=True)
+ instance = self.mock_nsx.start()
+ instance.return_value.login.return_value = "the_cookie"
+ # Avoid runs of the synchronizer looping call
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ patch_sync.start()
+
+ instance.return_value.request.side_effect = self.fc.fake_request
+ super(PortSecurityTestCase, self).setUp(PLUGIN_NAME)
+ self.addCleanup(self.fc.reset_all)
+ self.addCleanup(self.mock_nsx.stop)
+ self.addCleanup(patch_sync.stop)
+
+
+class TestPortSecurity(PortSecurityTestCase, psec.TestPortSecurity):
+ pass
--- /dev/null
+# Copyright (c) 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo.config import cfg
+
+from neutron.extensions import multiprovidernet as mpnet
+from neutron.extensions import providernet as pnet
+from neutron.tests.unit.vmware import NSXEXT_PATH
+from neutron.tests.unit.vmware.test_nsx_plugin import NsxPluginV2TestCase
+
+
+class TestProvidernet(NsxPluginV2TestCase):
+
+ def test_create_provider_network_default_physical_net(self):
+ data = {'network': {'name': 'net1',
+ 'admin_state_up': True,
+ 'tenant_id': 'admin',
+ pnet.NETWORK_TYPE: 'vlan',
+ pnet.SEGMENTATION_ID: 411}}
+ network_req = self.new_create_request('networks', data, self.fmt)
+ net = self.deserialize(self.fmt, network_req.get_response(self.api))
+ self.assertEqual(net['network'][pnet.NETWORK_TYPE], 'vlan')
+ self.assertEqual(net['network'][pnet.SEGMENTATION_ID], 411)
+
+ def test_create_provider_network(self):
+ data = {'network': {'name': 'net1',
+ 'admin_state_up': True,
+ 'tenant_id': 'admin',
+ pnet.NETWORK_TYPE: 'vlan',
+ pnet.SEGMENTATION_ID: 411,
+ pnet.PHYSICAL_NETWORK: 'physnet1'}}
+ network_req = self.new_create_request('networks', data, self.fmt)
+ net = self.deserialize(self.fmt, network_req.get_response(self.api))
+ self.assertEqual(net['network'][pnet.NETWORK_TYPE], 'vlan')
+ self.assertEqual(net['network'][pnet.SEGMENTATION_ID], 411)
+ self.assertEqual(net['network'][pnet.PHYSICAL_NETWORK], 'physnet1')
+
+
+class TestMultiProviderNetworks(NsxPluginV2TestCase):
+
+ def setUp(self, plugin=None):
+ cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
+ super(TestMultiProviderNetworks, self).setUp()
+
+ def test_create_network_provider(self):
+ data = {'network': {'name': 'net1',
+ pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1,
+ 'tenant_id': 'tenant_one'}}
+ network_req = self.new_create_request('networks', data)
+ network = self.deserialize(self.fmt,
+ network_req.get_response(self.api))
+ self.assertEqual(network['network'][pnet.NETWORK_TYPE], 'vlan')
+ self.assertEqual(network['network'][pnet.PHYSICAL_NETWORK], 'physnet1')
+ self.assertEqual(network['network'][pnet.SEGMENTATION_ID], 1)
+ self.assertNotIn(mpnet.SEGMENTS, network['network'])
+
+ def test_create_network_single_multiple_provider(self):
+ data = {'network': {'name': 'net1',
+ mpnet.SEGMENTS:
+ [{pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1}],
+ 'tenant_id': 'tenant_one'}}
+ net_req = self.new_create_request('networks', data)
+ network = self.deserialize(self.fmt, net_req.get_response(self.api))
+ for provider_field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
+ pnet.SEGMENTATION_ID]:
+ self.assertNotIn(provider_field, network['network'])
+ tz = network['network'][mpnet.SEGMENTS][0]
+ self.assertEqual(tz[pnet.NETWORK_TYPE], 'vlan')
+ self.assertEqual(tz[pnet.PHYSICAL_NETWORK], 'physnet1')
+ self.assertEqual(tz[pnet.SEGMENTATION_ID], 1)
+
+ # Tests get_network()
+ net_req = self.new_show_request('networks', network['network']['id'])
+ network = self.deserialize(self.fmt, net_req.get_response(self.api))
+ tz = network['network'][mpnet.SEGMENTS][0]
+ self.assertEqual(tz[pnet.NETWORK_TYPE], 'vlan')
+ self.assertEqual(tz[pnet.PHYSICAL_NETWORK], 'physnet1')
+ self.assertEqual(tz[pnet.SEGMENTATION_ID], 1)
+
+ def test_create_network_multprovider(self):
+ data = {'network': {'name': 'net1',
+ mpnet.SEGMENTS:
+ [{pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1},
+ {pnet.NETWORK_TYPE: 'stt',
+ pnet.PHYSICAL_NETWORK: 'physnet1'}],
+ 'tenant_id': 'tenant_one'}}
+ network_req = self.new_create_request('networks', data)
+ network = self.deserialize(self.fmt,
+ network_req.get_response(self.api))
+ tz = network['network'][mpnet.SEGMENTS]
+ for tz in data['network'][mpnet.SEGMENTS]:
+ for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
+ pnet.SEGMENTATION_ID]:
+ self.assertEqual(tz.get(field), tz.get(field))
+
+ # Tests get_network()
+ net_req = self.new_show_request('networks', network['network']['id'])
+ network = self.deserialize(self.fmt, net_req.get_response(self.api))
+ tz = network['network'][mpnet.SEGMENTS]
+ for tz in data['network'][mpnet.SEGMENTS]:
+ for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
+ pnet.SEGMENTATION_ID]:
+ self.assertEqual(tz.get(field), tz.get(field))
+
+ def test_create_network_with_provider_and_multiprovider_fail(self):
+ data = {'network': {'name': 'net1',
+ mpnet.SEGMENTS:
+ [{pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1}],
+ pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1,
+ 'tenant_id': 'tenant_one'}}
+
+ network_req = self.new_create_request('networks', data)
+ res = network_req.get_response(self.api)
+ self.assertEqual(res.status_int, 400)
+
+ def test_create_network_duplicate_segments(self):
+ data = {'network': {'name': 'net1',
+ mpnet.SEGMENTS:
+ [{pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1},
+ {pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1}],
+ 'tenant_id': 'tenant_one'}}
+ network_req = self.new_create_request('networks', data)
+ res = network_req.get_response(self.api)
+ self.assertEqual(res.status_int, 400)
--- /dev/null
+# Copyright (c) 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+
+import mock
+from oslo.config import cfg
+import webob.exc
+
+from neutron import context
+from neutron.plugins.nicira.dbexts import qos_db
+from neutron.plugins.nicira.extensions import qos as ext_qos
+from neutron.plugins.nicira import nsxlib
+from neutron.tests.unit import test_extensions
+from neutron.tests.unit.vmware import NSXEXT_PATH
+from neutron.tests.unit.vmware.test_nsx_plugin import NsxPluginV2TestCase
+
+
+class QoSTestExtensionManager(object):
+
+ def get_resources(self):
+ return ext_qos.Qos.get_resources()
+
+ def get_actions(self):
+ return []
+
+ def get_request_extensions(self):
+ return []
+
+
+class TestQoSQueue(NsxPluginV2TestCase):
+
+ def setUp(self, plugin=None):
+ cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
+ super(TestQoSQueue, self).setUp()
+ ext_mgr = QoSTestExtensionManager()
+ self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+
+ def _create_qos_queue(self, fmt, body, **kwargs):
+ qos_queue = self.new_create_request('qos-queues', body)
+ if (kwargs.get('set_context') and 'tenant_id' in kwargs):
+ # create a specific auth context for this request
+ qos_queue.environ['neutron.context'] = context.Context(
+ '', kwargs['tenant_id'])
+
+ return qos_queue.get_response(self.ext_api)
+
+ @contextlib.contextmanager
+ def qos_queue(self, name='foo', min='0', max='10',
+ qos_marking=None, dscp='0', default=None, no_delete=False):
+
+ body = {'qos_queue': {'tenant_id': 'tenant',
+ 'name': name,
+ 'min': min,
+ 'max': max}}
+
+ if qos_marking:
+ body['qos_queue']['qos_marking'] = qos_marking
+ if dscp:
+ body['qos_queue']['dscp'] = dscp
+ if default:
+ body['qos_queue']['default'] = default
+ res = self._create_qos_queue('json', body)
+ qos_queue = self.deserialize('json', res)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ try:
+ yield qos_queue
+ finally:
+ if not no_delete:
+ self._delete('qos-queues',
+ qos_queue['qos_queue']['id'])
+
+ def test_create_qos_queue(self):
+ with self.qos_queue(name='fake_lqueue', min=34, max=44,
+ qos_marking='untrusted', default=False) as q:
+ self.assertEqual(q['qos_queue']['name'], 'fake_lqueue')
+ self.assertEqual(q['qos_queue']['min'], 34)
+ self.assertEqual(q['qos_queue']['max'], 44)
+ self.assertEqual(q['qos_queue']['qos_marking'], 'untrusted')
+ self.assertFalse(q['qos_queue']['default'])
+
+ def test_create_trusted_qos_queue(self):
+ with mock.patch.object(qos_db.LOG, 'info') as log:
+ with mock.patch.object(nsxlib.queue, 'do_request',
+ return_value={"uuid": "fake_queue"}):
+ with self.qos_queue(name='fake_lqueue', min=34, max=44,
+ qos_marking='trusted', default=False) as q:
+ self.assertIsNone(q['qos_queue']['dscp'])
+ self.assertTrue(log.called)
+
+ def test_create_qos_queue_name_exceeds_40_chars(self):
+ name = 'this_is_a_queue_whose_name_is_longer_than_40_chars'
+ with self.qos_queue(name=name) as queue:
+ # Assert Neutron name is not truncated
+ self.assertEqual(queue['qos_queue']['name'], name)
+
+ def test_create_qos_queue_default(self):
+ with self.qos_queue(default=True) as q:
+ self.assertTrue(q['qos_queue']['default'])
+
+ def test_create_qos_queue_two_default_queues_fail(self):
+ with self.qos_queue(default=True):
+ body = {'qos_queue': {'tenant_id': 'tenant',
+ 'name': 'second_default_queue',
+ 'default': True}}
+ res = self._create_qos_queue('json', body)
+ self.assertEqual(res.status_int, 409)
+
+ def test_create_port_with_queue(self):
+ with self.qos_queue(default=True) as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ self.assertEqual(net1['network'][ext_qos.QUEUE],
+ q1['qos_queue']['id'])
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ with self.port(device_id=device_id, do_delete=False) as p:
+ self.assertEqual(len(p['port'][ext_qos.QUEUE]), 36)
+
+ def test_create_shared_queue_networks(self):
+ with self.qos_queue(default=True, no_delete=True) as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ self.assertEqual(net1['network'][ext_qos.QUEUE],
+ q1['qos_queue']['id'])
+ res = self._create_network('json', 'net2', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net2 = self.deserialize('json', res)
+ self.assertEqual(net1['network'][ext_qos.QUEUE],
+ q1['qos_queue']['id'])
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ res = self._create_port('json', net1['network']['id'],
+ device_id=device_id)
+ port1 = self.deserialize('json', res)
+ res = self._create_port('json', net2['network']['id'],
+ device_id=device_id)
+ port2 = self.deserialize('json', res)
+ self.assertEqual(port1['port'][ext_qos.QUEUE],
+ port2['port'][ext_qos.QUEUE])
+
+ self._delete('ports', port1['port']['id'])
+ self._delete('ports', port2['port']['id'])
+
+ def test_remove_queue_in_use_fail(self):
+ with self.qos_queue(no_delete=True) as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ res = self._create_port('json', net1['network']['id'],
+ device_id=device_id)
+ port = self.deserialize('json', res)
+ self._delete('qos-queues', port['port'][ext_qos.QUEUE], 409)
+
+ def test_update_network_new_queue(self):
+ with self.qos_queue() as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ with self.qos_queue() as new_q:
+ data = {'network': {ext_qos.QUEUE: new_q['qos_queue']['id']}}
+ req = self.new_update_request('networks', data,
+ net1['network']['id'])
+ res = req.get_response(self.api)
+ net1 = self.deserialize('json', res)
+ self.assertEqual(net1['network'][ext_qos.QUEUE],
+ new_q['qos_queue']['id'])
+
+ def test_update_port_adding_device_id(self):
+ with self.qos_queue(no_delete=True) as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ res = self._create_port('json', net1['network']['id'])
+ port = self.deserialize('json', res)
+ self.assertIsNone(port['port'][ext_qos.QUEUE])
+
+ data = {'port': {'device_id': device_id}}
+ req = self.new_update_request('ports', data,
+ port['port']['id'])
+
+ res = req.get_response(self.api)
+ port = self.deserialize('json', res)
+ self.assertEqual(len(port['port'][ext_qos.QUEUE]), 36)
+
+ def test_get_port_with_qos_not_admin(self):
+ body = {'qos_queue': {'tenant_id': 'not_admin',
+ 'name': 'foo', 'min': 20, 'max': 20}}
+ res = self._create_qos_queue('json', body, tenant_id='not_admin')
+ q1 = self.deserialize('json', res)
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE, 'tenant_id',),
+ queue_id=q1['qos_queue']['id'],
+ tenant_id="not_admin")
+ net1 = self.deserialize('json', res)
+ self.assertEqual(len(net1['network'][ext_qos.QUEUE]), 36)
+ res = self._create_port('json', net1['network']['id'],
+ tenant_id='not_admin', set_context=True)
+
+ port = self.deserialize('json', res)
+ self.assertNotIn(ext_qos.QUEUE, port['port'])
+
+ def test_dscp_value_out_of_range(self):
+ body = {'qos_queue': {'tenant_id': 'admin', 'dscp': '64',
+ 'name': 'foo', 'min': 20, 'max': 20}}
+ res = self._create_qos_queue('json', body)
+ self.assertEqual(res.status_int, 400)
+
+ def test_non_admin_cannot_create_queue(self):
+ body = {'qos_queue': {'tenant_id': 'not_admin',
+ 'name': 'foo', 'min': 20, 'max': 20}}
+ res = self._create_qos_queue('json', body, tenant_id='not_admin',
+ set_context=True)
+ self.assertEqual(res.status_int, 403)
+
+ def test_update_port_non_admin_does_not_show_queue_id(self):
+ body = {'qos_queue': {'tenant_id': 'not_admin',
+ 'name': 'foo', 'min': 20, 'max': 20}}
+ res = self._create_qos_queue('json', body, tenant_id='not_admin')
+ q1 = self.deserialize('json', res)
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ tenant_id='not_admin',
+ queue_id=q1['qos_queue']['id'])
+
+ net1 = self.deserialize('json', res)
+ res = self._create_port('json', net1['network']['id'],
+ tenant_id='not_admin', set_context=True)
+ port = self.deserialize('json', res)
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ data = {'port': {'device_id': device_id}}
+ neutron_context = context.Context('', 'not_admin')
+ port = self._update('ports', port['port']['id'], data,
+ neutron_context=neutron_context)
+ self.assertNotIn(ext_qos.QUEUE, port['port'])
+
+ def test_rxtx_factor(self):
+ with self.qos_queue(max=10) as q1:
+
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ res = self._create_port('json', net1['network']['id'],
+ arg_list=(ext_qos.RXTX_FACTOR,),
+ rxtx_factor=2, device_id='1')
+ port = self.deserialize('json', res)
+ req = self.new_show_request('qos-queues',
+ port['port'][ext_qos.QUEUE])
+ res = req.get_response(self.ext_api)
+ queue = self.deserialize('json', res)
+ self.assertEqual(queue['qos_queue']['max'], 20)
import mock
from neutron.plugins.nicira.common import config # noqa
-from neutron.plugins.nicira import nsx_cluster
-from neutron.plugins.nicira import NvpApiClient
-from neutron.plugins.nicira import nvplib
+from neutron.plugins.nicira import nsx_cluster as cluster
+from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests import base
from neutron.tests.unit import test_api_v2
-from neutron.tests.unit.vmware import fake_nvpapiclient
+from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import STUBS_PATH
class NsxlibTestCase(base.BaseTestCase):
def setUp(self):
- # mock nvp api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
+ self.fc = fake.FakeClient(STUBS_PATH)
self.mock_nsxapi = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nsxapi.start()
instance.return_value.login.return_value = "the_cookie"
fake_version = getattr(self, 'fake_version', "3.0")
instance.return_value.get_nvp_version.return_value = (
- NvpApiClient.NVPVersion(fake_version))
+ api_client.NVPVersion(fake_version))
- def _fake_request(*args, **kwargs):
- return self.fc.fake_request(*args, **kwargs)
-
- instance.return_value.request.side_effect = _fake_request
- self.fake_cluster = nsx_cluster.NSXCluster(
+ instance.return_value.request.side_effect = self.fc.fake_request
+ self.fake_cluster = cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
- self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
+ self.fake_cluster.api_client = api_client.NVPApiHelper(
('1.1.1.1', '999', True),
self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
class NsxlibNegativeBaseTestCase(base.BaseTestCase):
def setUp(self):
- # mock nsx api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
+ self.fc = fake.FakeClient(STUBS_PATH)
self.mock_nsxapi = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nsxapi.start()
instance.return_value.login.return_value = "the_cookie"
# these tests as calls are throwing up errors anyway
fake_version = getattr(self, 'fake_version', "3.0")
instance.return_value.get_nvp_version.return_value = (
- NvpApiClient.NVPVersion(fake_version))
+ api_client.NVPVersion(fake_version))
def _faulty_request(*args, **kwargs):
- raise nvplib.NvpApiClient.NvpApiException
+ raise api_client.NvpApiException
instance.return_value.request.side_effect = _faulty_request
- self.fake_cluster = nsx_cluster.NSXCluster(
+ self.fake_cluster = cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
- self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
+ self.fake_cluster.api_client = api_client.NVPApiHelper(
('1.1.1.1', '999', True),
self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
from neutron.plugins.nicira.nsxlib import l2gateway as l2gwlib
from neutron.plugins.nicira.nsxlib import switch as switchlib
-from neutron.plugins.nicira import nvplib
+from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests.unit import test_api_v2
from neutron.tests.unit.vmware.nsxlib import base
class L2GatewayNegativeTestCase(base.NsxlibNegativeBaseTestCase):
def test_create_l2_gw_service_on_failure(self):
- self.assertRaises(nvplib.NvpApiClient.NvpApiException,
+ self.assertRaises(api_client.NvpApiException,
l2gwlib.create_l2_gw_service,
self.fake_cluster,
'fake-tenant',
'interface_name': 'xxx'}])
def test_delete_l2_gw_service_on_failure(self):
- self.assertRaises(nvplib.NvpApiClient.NvpApiException,
+ self.assertRaises(api_client.NvpApiException,
l2gwlib.delete_l2_gw_service,
self.fake_cluster,
'fake-gateway')
def test_get_l2_gw_service_on_failure(self):
- self.assertRaises(nvplib.NvpApiClient.NvpApiException,
+ self.assertRaises(api_client.NvpApiException,
l2gwlib.get_l2_gw_service,
self.fake_cluster,
'fake-gateway')
def test_update_l2_gw_service_on_failure(self):
- self.assertRaises(nvplib.NvpApiClient.NvpApiException,
+ self.assertRaises(api_client.NvpApiException,
l2gwlib.update_l2_gw_service,
self.fake_cluster,
'fake-gateway',
l2gwlib.plug_l2_gw_service(
self.fake_cluster, lswitch['uuid'],
lport['uuid'], gw_id)
- uri = nvplib._build_uri_path(nvplib.LSWITCHPORT_RESOURCE,
- lport['uuid'],
- lswitch['uuid'],
- is_attachment=True)
- resp_obj = nvplib.do_request("GET", uri,
- cluster=self.fake_cluster)
+ uri = l2gwlib._build_uri_path(switchlib.LSWITCHPORT_RESOURCE,
+ lport['uuid'],
+ lswitch['uuid'],
+ is_attachment=True)
+ resp_obj = l2gwlib.do_request("GET", uri,
+ cluster=self.fake_cluster)
self.assertIn('LogicalPortAttachment', resp_obj)
self.assertEqual(resp_obj['LogicalPortAttachment']['type'],
'L2GatewayAttachment')
import mock
from neutron.common import exceptions
-from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import exceptions as nsx_exc
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import lsn as lsnlib
-from neutron.plugins.nicira import NvpApiClient
+from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests import base
lsn_id = "foo_lsn_id"
lsn_port_id = "foo_lsn_port_id"
lswitch_port_id = "foo_lswitch_port_id"
- self.mock_request.side_effect = NvpApiClient.Conflict
+ self.mock_request.side_effect = api_client.Conflict
self.assertRaises(
- nvp_exc.LsnConfigurationConflict,
+ nsx_exc.LsnConfigurationConflict,
lsnlib.lsn_port_plug_network,
self.cluster, lsn_id, lsn_port_id, lswitch_port_id)
from neutron.common import exceptions
from neutron.plugins.nicira.nsxlib import queue as queuelib
-from neutron.plugins.nicira import NvpApiClient
+from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests.unit.vmware.nsxlib import base
def test_create_lqueue_nsx_error_raises(self):
def raise_nsx_exc(*args, **kwargs):
- raise NvpApiClient.NvpApiException()
+ raise api_client.NvpApiException()
with mock.patch.object(queuelib, 'do_request', new=raise_nsx_exc):
self.assertRaises(
from neutron.common import exceptions
from neutron.openstack.common import uuidutils
-from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import exceptions as nsx_exc
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import switch as switchlib
-from neutron.plugins.nicira import NvpApiClient
-from neutron.plugins.nicira import nvplib
+from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests.unit import test_api_v2
from neutron.tests.unit.vmware.nsxlib import base
def _test_create_lrouter_dnat_rule(self, version):
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
- new=lambda: NvpApiClient.NVPVersion(version)):
+ new=lambda: api_client.NVPVersion(version)):
tenant_id = 'pippo'
lrouter = routerlib.create_lrouter(self.fake_cluster,
uuidutils.generate_uuid(),
self.fake_cluster, lrouter['uuid'], '10.0.0.99',
match_criteria={'destination_ip_addresses':
'192.168.0.5'})
- uri = nvplib._build_uri_path(routerlib.LROUTERNAT_RESOURCE,
- nat_rule['uuid'],
- lrouter['uuid'])
- resp_obj = nvplib.do_request("GET", uri, cluster=self.fake_cluster)
+ uri = routerlib._build_uri_path(routerlib.LROUTERNAT_RESOURCE,
+ nat_rule['uuid'],
+ lrouter['uuid'])
+ resp_obj = routerlib.do_request(
+ "GET", uri, cluster=self.fake_cluster)
self.assertEqual('DestinationNatRule', resp_obj['type'])
self.assertEqual('192.168.0.5',
resp_obj['match']['destination_ip_addresses'])
new_routes = [{"nexthop": "10.0.0.2",
"destination": "169.254.169.0/30"}, ]
- nvp_routes = [self._get_single_route(router_id)]
+ nsx_routes = [self._get_single_route(router_id)]
with mock.patch.object(routerlib, 'get_explicit_routes_lrouter',
- return_value=nvp_routes):
+ return_value=nsx_routes):
with mock.patch.object(routerlib, 'create_explicit_route_lrouter',
return_value='fake_uuid'):
old_routes = routerlib.update_explicit_routes_lrouter(
self.fake_cluster, router_id, new_routes)
- self.assertEqual(old_routes, nvp_routes)
+ self.assertEqual(old_routes, nsx_routes)
def test_update_lrouter_with_no_routes_raise_nsx_exception(self):
router_id = 'fake_router_id'
with mock.patch.object(routerlib, 'get_explicit_routes_lrouter',
return_value=nsx_routes):
with mock.patch.object(routerlib, 'create_explicit_route_lrouter',
- side_effect=NvpApiClient.NvpApiException):
- self.assertRaises(NvpApiClient.NvpApiException,
+ side_effect=api_client.NvpApiException):
+ self.assertRaises(api_client.NvpApiException,
routerlib.update_explicit_routes_lrouter,
self.fake_cluster, router_id, new_routes)
with mock.patch.object(routerlib, 'get_explicit_routes_lrouter',
return_value=nsx_routes):
with mock.patch.object(routerlib, 'delete_explicit_route_lrouter',
- side_effect=NvpApiClient.NvpApiException):
+ side_effect=api_client.NvpApiException):
with mock.patch.object(
routerlib, 'create_explicit_route_lrouter',
return_value='fake_uuid'):
self.assertRaises(
- NvpApiClient.NvpApiException,
+ api_client.NvpApiException,
routerlib.update_explicit_routes_lrouter,
self.fake_cluster, router_id, new_routes)
class RouterNegativeTestCase(base.NsxlibNegativeBaseTestCase):
def test_create_lrouter_on_failure(self):
- self.assertRaises(nvplib.NvpApiClient.NvpApiException,
+ self.assertRaises(api_client.NvpApiException,
routerlib.create_lrouter,
self.fake_cluster,
uuidutils.generate_uuid(),
'my_hop')
def test_delete_lrouter_on_failure(self):
- self.assertRaises(nvplib.NvpApiClient.NvpApiException,
+ self.assertRaises(api_client.NvpApiException,
routerlib.delete_lrouter,
self.fake_cluster,
'fake_router')
def test_get_lrouter_on_failure(self):
- self.assertRaises(nvplib.NvpApiClient.NvpApiException,
+ self.assertRaises(api_client.NvpApiException,
routerlib.get_lrouter,
self.fake_cluster,
'fake_router')
def test_update_lrouter_on_failure(self):
- self.assertRaises(nvplib.NvpApiClient.NvpApiException,
+ self.assertRaises(api_client.NvpApiException,
routerlib.update_lrouter,
self.fake_cluster,
'fake_router',
def _create_lrouter(self, version, neutron_id=None, distributed=None):
with mock.patch.object(
self.fake_cluster.api_client, 'get_nvp_version',
- return_value=NvpApiClient.NVPVersion(version)):
+ return_value=api_client.NVPVersion(version)):
if not neutron_id:
neutron_id = uuidutils.generate_uuid()
lrouter = routerlib.create_lrouter(
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
- return_value=NvpApiClient.NVPVersion(version)):
+ return_value=api_client.NVPVersion(version)):
with mock.patch.dict(routerlib.ROUTER_FUNC_DICT,
foo_func_dict, clear=True):
return routerlib.update_lrouter(
'foo_nexthop', routes={'foo_destination': 'foo_address'})
def test_version_dependent_update_lrouter_old_versions(self):
- self.assertRaises(nvp_exc.NvpInvalidVersion,
+ self.assertRaises(nsx_exc.NvpInvalidVersion,
self._test_version_dependent_update_lrouter,
"2.9")
- self.assertRaises(nvp_exc.NvpInvalidVersion,
+ self.assertRaises(nsx_exc.NvpInvalidVersion,
self._test_version_dependent_update_lrouter,
"3.0")
- self.assertRaises(nvp_exc.NvpInvalidVersion,
+ self.assertRaises(nsx_exc.NvpInvalidVersion,
self._test_version_dependent_update_lrouter,
"3.1")
def test_update_lrouter_port_ips_nonexistent_router_raises(self):
self.assertRaises(
- nvp_exc.NvpPluginException, routerlib.update_lrouter_port_ips,
+ nsx_exc.NvpPluginException, routerlib.update_lrouter_port_ips,
self.fake_cluster, 'boo-router', 'boo-port', [], [])
def test_update_lrouter_port_ips_nsx_exception_raises(self):
'name', True, ['192.168.0.1'], '00:11:22:33:44:55')
def raise_nsx_exc(*args, **kwargs):
- raise NvpApiClient.NvpApiException()
+ raise api_client.NvpApiException()
with mock.patch.object(routerlib, 'do_request', new=raise_nsx_exc):
self.assertRaises(
- nvp_exc.NvpPluginException, routerlib.update_lrouter_port_ips,
+ nsx_exc.NvpPluginException, routerlib.update_lrouter_port_ips,
self.fake_cluster, lrouter['uuid'],
lrouter_port['uuid'], [], [])
lrouter_port = routerlib.create_router_lport(
self.fake_cluster, lrouter['uuid'], 'pippo', 'neutron_port_id',
'name', True, ['192.168.0.1'], '00:11:22:33:44:55')
- self.assertRaises(nvp_exc.NvpInvalidAttachmentType,
+ self.assertRaises(nsx_exc.NvpInvalidAttachmentType,
routerlib.plug_router_port_attachment,
self.fake_cluster, lrouter['uuid'],
lrouter_port['uuid'], 'gw_att', 'BadType')
'10.0.0.1')
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
- new=lambda: NvpApiClient.NVPVersion(version)):
+ new=lambda: api_client.NVPVersion(version)):
routerlib.create_lrouter_snat_rule(
self.fake_cluster, lrouter['uuid'],
'10.0.0.2', '10.0.0.2', order=200,
'10.0.0.1')
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
- return_value=NvpApiClient.NVPVersion(version)):
+ return_value=api_client.NVPVersion(version)):
routerlib.create_lrouter_dnat_rule(
self.fake_cluster, lrouter['uuid'], '192.168.0.2', order=200,
dest_port=dest_port,
'10.0.0.1')
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
- new=lambda: NvpApiClient.NVPVersion(version)):
+ new=lambda: api_client.NVPVersion(version)):
routerlib.create_lrouter_nosnat_rule(
self.fake_cluster, lrouter['uuid'],
order=100,
# v2 or v3 makes no difference for this test
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
- new=lambda: NvpApiClient.NVPVersion('2.0')):
+ new=lambda: api_client.NVPVersion('2.0')):
routerlib.create_lrouter_snat_rule(
self.fake_cluster, lrouter['uuid'],
'10.0.0.2', '10.0.0.2', order=220,
rules = routerlib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
self.assertEqual(len(rules), 3)
self.assertRaises(
- nvp_exc.NvpNatRuleMismatch,
+ nsx_exc.NvpNatRuleMismatch,
routerlib.delete_nat_rules_by_match,
self.fake_cluster, lrouter['uuid'],
'SomeWeirdType', 1, 1)
from neutron.common import exceptions
from neutron.plugins.nicira.nsxlib import secgroup as secgrouplib
-from neutron.plugins.nicira import nvplib
+from neutron.plugins.nicira import nvplib as nsx_utils
from neutron.tests.unit.vmware.nsxlib import base
def test_create_and_get_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'test'})
- sec_prof_res = nvplib.do_request(
+ sec_prof_res = secgrouplib.do_request(
secgrouplib.HTTP_GET,
- nvplib._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
+ nsx_utils._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
def test_create_and_get_default_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'default'})
- sec_prof_res = nvplib.do_request(
- nvplib.HTTP_GET,
- nvplib._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
+ sec_prof_res = nsx_utils.do_request(
+ secgrouplib.HTTP_GET,
+ nsx_utils._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
'logical_port_ingress_rules': [ingress_rule]}
secgrouplib.update_security_group_rules(
self.fake_cluster, sec_prof['uuid'], new_rules)
- sec_prof_res = nvplib.do_request(
- nvplib.HTTP_GET,
- nvplib._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
+ sec_prof_res = nsx_utils.do_request(
+ secgrouplib.HTTP_GET,
+ nsx_utils._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
'logical_port_ingress_rules': []}
secgrouplib.update_security_group_rules(
self.fake_cluster, sec_prof['uuid'], new_rules)
- sec_prof_res = nvplib.do_request(
- nvplib.HTTP_GET,
- nvplib._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
+ sec_prof_res = nsx_utils.do_request(
+ nsx_utils.HTTP_GET,
+ nsx_utils._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
secgrouplib.delete_security_profile(
self.fake_cluster, sec_prof['uuid'])
self.assertRaises(exceptions.NotFound,
- nvplib.do_request,
- nvplib.HTTP_GET,
- nvplib._build_uri_path(
+ secgrouplib.do_request,
+ secgrouplib.HTTP_GET,
+ nsx_utils._build_uri_path(
'security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import versioning
-from neutron.plugins.nicira import NvpApiClient
+from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests import base
class TestVersioning(base.BaseTestCase):
def test_function_handling_missing_minor(self):
- version = NvpApiClient.NVPVersion('2.0')
+ version = api_client.NVPVersion('2.0')
function = versioning.get_function_by_version(
routerlib.ROUTER_FUNC_DICT, 'create_lrouter', version)
self.assertEqual(routerlib.create_implicit_routing_lrouter,
function)
def test_function_handling_with_both_major_and_minor(self):
- version = NvpApiClient.NVPVersion('3.2')
+ version = api_client.NVPVersion('3.2')
function = versioning.get_function_by_version(
routerlib.ROUTER_FUNC_DICT, 'create_lrouter', version)
self.assertEqual(routerlib.create_explicit_routing_lrouter,
function)
def test_function_handling_with_newer_major(self):
- version = NvpApiClient.NVPVersion('5.2')
+ version = api_client.NVPVersion('5.2')
function = versioning.get_function_by_version(
routerlib.ROUTER_FUNC_DICT, 'create_lrouter', version)
self.assertEqual(routerlib.create_explicit_routing_lrouter,
function)
def test_function_handling_with_obsolete_major(self):
- version = NvpApiClient.NVPVersion('1.2')
+ version = api_client.NVPVersion('1.2')
self.assertRaises(NotImplementedError,
versioning.get_function_by_version,
routerlib.ROUTER_FUNC_DICT,
'create_lrouter', version)
def test_function_handling_with_unknown_version(self):
- self.assertRaises(NvpApiClient.ServiceUnavailable,
+ self.assertRaises(api_client.ServiceUnavailable,
versioning.get_function_by_version,
routerlib.ROUTER_FUNC_DICT,
'create_lrouter', None)
from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira.dhcp_meta import rpc
from neutron.tests.unit.openvswitch import test_agent_scheduler as test_base
-from neutron.tests.unit.vmware import fake_nvpapiclient
+from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import PLUGIN_NAME
from neutron.tests.unit.vmware import STUBS_PATH
-class NVPDhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase):
+class DhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase):
plugin_str = PLUGIN_NAME
def setUp(self):
test_config['config_files'] = [get_fake_conf('nsx.ini.full.test')]
- # mock nvp api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
- self.mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
- instance = self.mock_nvpapi.start()
+ # mock api client
+ self.fc = fake.FakeClient(STUBS_PATH)
+ self.mock_nsx_api = mock.patch(NSXAPI_NAME, autospec=True)
+ instance = self.mock_nsx_api.start()
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
- def _fake_request(*args, **kwargs):
- return self.fc.fake_request(*args, **kwargs)
-
- # Emulate tests against NVP 2.x
+ # Emulate tests against NSX 2.x
instance.return_value.get_nvp_version.return_value = "2.999"
- instance.return_value.request.side_effect = _fake_request
- super(NVPDhcpAgentNotifierTestCase, self).setUp()
+ instance.return_value.request.side_effect = self.fc.fake_request
+ super(DhcpAgentNotifierTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(patch_sync.stop)
- self.addCleanup(self.mock_nvpapi.stop)
+ self.addCleanup(self.mock_nsx_api.stop)
self.addCleanup(cfg.CONF.reset)
def _test_gateway_subnet_notification(self, gateway='10.0.0.1'):
from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira import nsx_cluster
from neutron.plugins.nicira.nsxlib import lsn as lsnlib
-from neutron.plugins.nicira import NvpApiClient as nvp_client
+from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests import base
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import PLUGIN_NAME
cfg.CONF.set_override('core_plugin', PLUGIN_NAME)
self.assertEqual(config.AgentModes.AGENTLESS,
cfg.CONF.NSX.agent_mode)
- # The version returned from NVP does not really matter here
- with mock.patch.object(nvp_client.NVPApiHelper,
+ # The version returned from NSX does not really matter here
+ with mock.patch.object(api_client.NVPApiHelper,
'get_nvp_version',
- return_value=nvp_client.NVPVersion("9.9")):
+ return_value=api_client.NVPVersion("9.9")):
with mock.patch.object(lsnlib,
'service_cluster_exists',
return_value=True):
cfg.CONF.set_override('core_plugin', PLUGIN_NAME)
self.assertEqual(config.AgentModes.AGENTLESS,
cfg.CONF.NSX.agent_mode)
- with mock.patch.object(nvp_client.NVPApiHelper,
+ with mock.patch.object(api_client.NVPApiHelper,
'get_nvp_version',
- return_value=nvp_client.NVPVersion("3.2")):
+ return_value=api_client.NVPVersion("3.2")):
self.assertRaises(exceptions.NvpPluginException, NeutronManager)
def test_agentless_extensions_unmet_deps_fail(self):
cfg.CONF.set_override('core_plugin', PLUGIN_NAME)
self.assertEqual(config.AgentModes.AGENTLESS,
cfg.CONF.NSX.agent_mode)
- with mock.patch.object(nvp_client.NVPApiHelper,
+ with mock.patch.object(api_client.NVPApiHelper,
'get_nvp_version',
- return_value=nvp_client.NVPVersion("3.2")):
+ return_value=api_client.NVPVersion("3.2")):
with mock.patch.object(lsnlib,
'service_cluster_exists',
return_value=False):
from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.extensions import l3_ext_gw_mode
-from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
from neutron.extensions import securitygroup as secgrp
from neutron import manager
from neutron.manager import NeutronManager
from neutron.openstack.common.db import exception as db_exc
+from neutron.openstack.common import log
from neutron.openstack.common import uuidutils
-from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import exceptions as nsx_exc
from neutron.plugins.nicira.common import sync
-from neutron.plugins.nicira.dbexts import nicira_db
-from neutron.plugins.nicira.dbexts import qos_db
+from neutron.plugins.nicira.dbexts import nicira_db as nsx_db
from neutron.plugins.nicira.extensions import distributedrouter as dist_router
-from neutron.plugins.nicira.extensions import networkgw
-from neutron.plugins.nicira.extensions import qos
from neutron.plugins.nicira import NeutronPlugin
from neutron.plugins.nicira import nsxlib
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira.NvpApiClient import NVPVersion
from neutron.tests.unit import _test_extension_portbindings as test_bindings
import neutron.tests.unit.test_db_plugin as test_plugin
-import neutron.tests.unit.test_extension_allowedaddresspairs as test_addr_pair
import neutron.tests.unit.test_extension_ext_gw_mode as test_ext_gw_mode
-import neutron.tests.unit.test_extension_portsecurity as psec
import neutron.tests.unit.test_extension_security_group as ext_sg
-from neutron.tests.unit import test_extensions
import neutron.tests.unit.test_l3_plugin as test_l3_plugin
from neutron.tests.unit import testlib_api
-from neutron.tests.unit.vmware import fake_nvpapiclient
+from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import NSXEXT_PATH
from neutron.tests.unit.vmware import PLUGIN_NAME
from neutron.tests.unit.vmware import STUBS_PATH
-import neutron.tests.unit.vmware.test_networkgw as test_l2_gw
-
-from neutron.openstack.common import log
LOG = log.getLogger(__name__)
-class NiciraPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
+class NsxPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
def _create_network(self, fmt, name, admin_state_up,
arg_list=None, providernet_args=None, **kwargs):
ext_mgr=None,
service_plugins=None):
test_lib.test_config['config_files'] = [get_fake_conf('nsx.ini.test')]
- # mock nvp api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
- self.mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
- self.mock_instance = self.mock_nvpapi.start()
+ # mock api client
+ self.fc = fake.FakeClient(STUBS_PATH)
+ self.mock_nsx = mock.patch(NSXAPI_NAME, autospec=True)
+ self.mock_instance = self.mock_nsx.start()
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
- def _fake_request(*args, **kwargs):
- return self.fc.fake_request(*args, **kwargs)
-
- # Emulate tests against NVP 2.x
+ # Emulate tests against NSX 2.x
self.mock_instance.return_value.get_nvp_version.return_value = (
NVPVersion("2.9"))
- self.mock_instance.return_value.request.side_effect = _fake_request
- super(NiciraPluginV2TestCase, self).setUp(plugin=plugin,
- ext_mgr=ext_mgr)
+ self.mock_instance.return_value.request.side_effect = (
+ self.fc.fake_request)
+ super(NsxPluginV2TestCase, self).setUp(plugin=plugin,
+ ext_mgr=ext_mgr)
cfg.CONF.set_override('metadata_mode', None, 'NSX')
self.addCleanup(self.fc.reset_all)
self.addCleanup(mock.patch.stopall)
-class TestNiciraBasicGet(test_plugin.TestBasicGet, NiciraPluginV2TestCase):
+class TestBasicGet(test_plugin.TestBasicGet, NsxPluginV2TestCase):
pass
-class TestNiciraV2HTTPResponse(test_plugin.TestV2HTTPResponse,
- NiciraPluginV2TestCase):
+class TestV2HTTPResponse(test_plugin.TestV2HTTPResponse, NsxPluginV2TestCase):
pass
-class TestNiciraProvidernet(NiciraPluginV2TestCase):
-
- def test_create_provider_network_default_physical_net(self):
- data = {'network': {'name': 'net1',
- 'admin_state_up': True,
- 'tenant_id': 'admin',
- pnet.NETWORK_TYPE: 'vlan',
- pnet.SEGMENTATION_ID: 411}}
- network_req = self.new_create_request('networks', data, self.fmt)
- net = self.deserialize(self.fmt, network_req.get_response(self.api))
- self.assertEqual(net['network'][pnet.NETWORK_TYPE], 'vlan')
- self.assertEqual(net['network'][pnet.SEGMENTATION_ID], 411)
-
- def test_create_provider_network(self):
- data = {'network': {'name': 'net1',
- 'admin_state_up': True,
- 'tenant_id': 'admin',
- pnet.NETWORK_TYPE: 'vlan',
- pnet.SEGMENTATION_ID: 411,
- pnet.PHYSICAL_NETWORK: 'physnet1'}}
- network_req = self.new_create_request('networks', data, self.fmt)
- net = self.deserialize(self.fmt, network_req.get_response(self.api))
- self.assertEqual(net['network'][pnet.NETWORK_TYPE], 'vlan')
- self.assertEqual(net['network'][pnet.SEGMENTATION_ID], 411)
- self.assertEqual(net['network'][pnet.PHYSICAL_NETWORK], 'physnet1')
-
-
-class TestNiciraPortsV2(NiciraPluginV2TestCase,
- test_plugin.TestPortsV2,
- test_bindings.PortBindingsTestCase,
- test_bindings.PortBindingsHostTestCaseMixin):
+class TestPortsV2(NsxPluginV2TestCase,
+ test_plugin.TestPortsV2,
+ test_bindings.PortBindingsTestCase,
+ test_bindings.PortBindingsHostTestCaseMixin):
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = True
self.assertEqual(len(ls), 2)
def test_update_port_delete_ip(self):
- # This test case overrides the default because the nvp plugin
+ # This test case overrides the default because the nsx plugin
# implements port_security/security groups and it is not allowed
# to remove an ip address from a port unless the security group
# is first removed.
query_params = "network_id=%s" % net_id
self._test_list_resources('port', [],
query_params=query_params)
- # Also verify no orphan port was left on nvp
+ # Also verify no orphan port was left on nsx
# no port should be there at all
self.assertFalse(self.fc._fake_lswitch_lport_dict)
- def test_create_port_nvp_error_no_orphan_left(self):
+ def test_create_port_nsx_error_no_orphan_left(self):
with mock.patch.object(nsxlib.switch, 'create_lport',
side_effect=NvpApiClient.NvpApiException):
with self.network() as net:
self._verify_no_orphan_left(net_id)
def test_create_port_neutron_error_no_orphan_left(self):
- with mock.patch.object(nicira_db, 'add_neutron_nsx_port_mapping',
+ with mock.patch.object(nsx_db, 'add_neutron_nsx_port_mapping',
side_effect=ntn_exc.NeutronException):
with self.network() as net:
net_id = net['network']['id']
inner_exception=sql_exc.IntegrityError(mock.ANY,
mock.ANY,
mock.ANY))
- with mock.patch.object(nicira_db, 'add_neutron_nsx_port_mapping',
+ with mock.patch.object(nsx_db, 'add_neutron_nsx_port_mapping',
side_effect=db_exception):
with self.network() as net:
with self.port(device_owner='network:dhcp'):
def test_create_port_maintenance_returns_503(self):
with self.network() as net:
with mock.patch.object(nsxlib.switch, 'do_request',
- side_effect=nvp_exc.MaintenanceInProgress):
+ side_effect=nsx_exc.MaintenanceInProgress):
data = {'port': {'network_id': net['network']['id'],
'admin_state_up': False,
'fixed_ips': [],
res.status_int)
-class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
- NiciraPluginV2TestCase):
+class TestNetworksV2(test_plugin.TestNetworksV2, NsxPluginV2TestCase):
def _test_create_bridge_network(self, vlan_id=None):
net_type = vlan_id and 'vlan' or 'flat'
def test_list_networks_filter_by_id(self):
# We add this unit test to cover some logic specific to the
- # nvp plugin
+ # nsx plugin
with contextlib.nested(self.network(name='net1'),
self.network(name='net2')) as (net1, net2):
query_params = 'id=%s' % net1['network']['id']
'admin_state_up': True,
'tenant_id': self._tenant_id}}
with mock.patch.object(nsxlib.switch, 'do_request',
- side_effect=nvp_exc.MaintenanceInProgress):
+ side_effect=nsx_exc.MaintenanceInProgress):
net_req = self.new_create_request('networks', data, self.fmt)
res = net_req.get_response(self.api)
self.assertEqual(webob.exc.HTTPServiceUnavailable.code,
net['network']['id'], data)
-class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase):
-
- def setUp(self):
- test_lib.test_config['config_files'] = [get_fake_conf('nsx.ini.test')]
- # mock nvp api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
- self.mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
- instance = self.mock_nvpapi.start()
- instance.return_value.login.return_value = "the_cookie"
- # Avoid runs of the synchronizer looping call
- patch_sync = mock.patch.object(sync, '_start_loopingcall')
- patch_sync.start()
-
- def _fake_request(*args, **kwargs):
- return self.fc.fake_request(*args, **kwargs)
-
- instance.return_value.request.side_effect = _fake_request
- super(NiciraPortSecurityTestCase, self).setUp(PLUGIN_NAME)
- self.addCleanup(self.fc.reset_all)
- self.addCleanup(self.mock_nvpapi.stop)
- self.addCleanup(patch_sync.stop)
-
-
-class TestNiciraPortSecurity(NiciraPortSecurityTestCase,
- psec.TestPortSecurity):
- pass
-
-
-class TestNiciraAllowedAddressPairs(NiciraPluginV2TestCase,
- test_addr_pair.TestAllowedAddressPairs):
- pass
-
-
-class NiciraSecurityGroupsTestCase(ext_sg.SecurityGroupDBTestCase):
+class SecurityGroupsTestCase(ext_sg.SecurityGroupDBTestCase):
def setUp(self):
test_lib.test_config['config_files'] = [get_fake_conf('nsx.ini.test')]
- # mock nvp api client
- fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
- self.mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
- instance = self.mock_nvpapi.start()
+ # mock nsx api client
+ self.fc = fake.FakeClient(STUBS_PATH)
+ self.mock_nsx = mock.patch(NSXAPI_NAME, autospec=True)
+ instance = self.mock_nsx.start()
instance.return_value.login.return_value = "the_cookie"
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
- def _fake_request(*args, **kwargs):
- return fc.fake_request(*args, **kwargs)
-
- instance.return_value.request.side_effect = _fake_request
- self.addCleanup(self.mock_nvpapi.stop)
+ instance.return_value.request.side_effect = self.fc.fake_request
+ self.addCleanup(self.mock_nsx.stop)
self.addCleanup(patch_sync.stop)
- super(NiciraSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
+ super(SecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
-class TestNiciraSecurityGroup(ext_sg.TestSecurityGroups,
- NiciraSecurityGroupsTestCase):
+class TestSecurityGroup(ext_sg.TestSecurityGroups, SecurityGroupsTestCase):
def test_create_security_group_name_exceeds_40_chars(self):
name = 'this_is_a_secgroup_whose_name_is_longer_than_40_chars'
self.assertEqual(res.status_int, 400)
-class TestNiciraL3ExtensionManager(object):
+class TestL3ExtensionManager(object):
def get_resources(self):
# Simulate extension of L3 attribute map
return []
-class TestNiciraL3SecGrpExtensionManager(TestNiciraL3ExtensionManager):
+class TestL3SecGrpExtensionManager(TestL3ExtensionManager):
"""A fake extension manager for L3 and Security Group extensions.
- Includes also Nicira-specific L3 attributes.
+ Includes also NSX specific L3 attributes.
"""
def get_resources(self):
- resources = super(TestNiciraL3SecGrpExtensionManager,
+ resources = super(TestL3SecGrpExtensionManager,
self).get_resources()
resources.extend(secgrp.Securitygroup.get_resources())
return resources
l3.RESOURCE_ATTRIBUTE_MAP = map_to_restore
-class NiciraL3NatTest(test_l3_plugin.L3BaseForIntTests,
- NiciraPluginV2TestCase):
+class L3NatTest(test_l3_plugin.L3BaseForIntTests, NsxPluginV2TestCase):
def _restore_l3_attribute_map(self):
l3.RESOURCE_ATTRIBUTE_MAP = self._l3_attribute_map_bk
cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
l3_attribute_map_bk = backup_l3_attribute_map()
self.addCleanup(restore_l3_attribute_map, l3_attribute_map_bk)
- ext_mgr = ext_mgr or TestNiciraL3ExtensionManager()
- super(NiciraL3NatTest, self).setUp(
+ ext_mgr = ext_mgr or TestL3ExtensionManager()
+ super(L3NatTest, self).setUp(
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins)
plugin_instance = NeutronManager.get_plugin()
self._plugin_name = "%s.%s" % (
self._plugin_class = plugin_instance.__class__
-class TestNiciraL3NatTestCase(NiciraL3NatTest,
- test_l3_plugin.L3NatDBIntTestCase,
- NiciraPluginV2TestCase):
+class TestL3NatTestCase(L3NatTest,
+ test_l3_plugin.L3NatDBIntTestCase,
+ NsxPluginV2TestCase):
def _create_l3_ext_network(self, vlan_id=None):
name = 'l3_ext_net'
for k, v in expected:
self.assertEqual(net['network'][k], v)
- def _nvp_validate_ext_gw(self, router_id, l3_gw_uuid, vlan_id):
- """Verify data on fake NVP API client in order to validate
+ def _nsx_validate_ext_gw(self, router_id, l3_gw_uuid, vlan_id):
+ """Verify data on fake NSX API client in order to validate
plugin did set them properly
"""
# First find the NSX router ID
ctx = context.get_admin_context()
- nsx_router_id = nicira_db.get_nsx_router_id(ctx.session, router_id)
+ nsx_router_id = nsx_db.get_nsx_router_id(ctx.session, router_id)
ports = [port for port in self.fc._fake_lrouter_lport_dict.values()
if (port['lr_uuid'] == nsx_router_id and
port['att_type'] == "L3GatewayAttachment")]
(router['router']['external_gateway_info']
['network_id']))
if validate_ext_gw:
- self._nvp_validate_ext_gw(router['router']['id'],
+ self._nsx_validate_ext_gw(router['router']['id'],
'l3_gw_uuid', vlan_id)
finally:
self._delete('routers', router['router']['id'])
def test_router_create_distributed_with_3_1(self):
self._test_router_create_with_distributed(True, True)
- def test_router_create_distributed_with_new_nvp_versions(self):
+ def test_router_create_distributed_with_new_nsx_versions(self):
with mock.patch.object(nsxlib.router, 'create_explicit_route_lrouter'):
self._test_router_create_with_distributed(True, True, '3.2')
self._test_router_create_with_distributed(True, True, '4.0')
'routers', data, self.fmt)
return router_req.get_response(self.ext_api)
- def test_router_create_nvp_error_returns_500(self, vlan_id=None):
+ def test_router_create_nsx_error_returns_500(self, vlan_id=None):
with mock.patch.object(nsxlib.router,
'create_router_lport',
side_effect=NvpApiClient.NvpApiException):
def test_router_add_gateway_invalid_network_returns_404(self):
# NOTE(salv-orlando): This unit test has been overriden
- # as the nicira plugin support the ext_gw_mode extension
+ # as the nsx plugin support the ext_gw_mode extension
# which mandates a uuid for the external network identifier
with self.router() as r:
self._add_external_gateway_to_router(
# TODO(salv-orlando): Verify whehter this is thread-safe
# w.r.t. sqllite and parallel testing
self._test_list_resources('router', [])
- # Check that router is not in NVP
+ # Check that router is not in NSX
self.assertFalse(self.fc._fake_lrouter_dict)
def test_router_create_with_gw_info_neutron_fail_does_rollback(self):
res.status_int)
self._verify_router_rollback()
- def test_router_create_with_gw_info_nvp_fail_does_rollback(self):
- # Simulate error while fetching nvp router gw port
+ def test_router_create_with_gw_info_nsx_fail_does_rollback(self):
+ # Simulate error while fetching nsx router gw port
with mock.patch.object(self._plugin_class,
'_find_router_gw_port',
side_effect=NvpApiClient.NvpApiException):
self.assertEqual(net_id,
s2['subnet']['network_id'])
if validate_ext_gw:
- self._nvp_validate_ext_gw(
+ self._nsx_validate_ext_gw(
body['router']['id'],
'l3_gw_uuid', vlan_id)
finally:
def test_floatingip_with_invalid_create_port(self):
self._test_floatingip_with_invalid_create_port(self._plugin_name)
- def _nvp_metadata_setup(self):
+ def _metadata_setup(self):
cfg.CONF.set_override('metadata_mode', 'access_network', 'NSX')
- def _nvp_metadata_teardown(self):
+ def _metadata_teardown(self):
cfg.CONF.set_override('metadata_mode', None, 'NSX')
def test_create_router_name_exceeds_40_chars(self):
self.assertEqual(rtr['router']['name'], name)
def test_router_add_interface_subnet_with_metadata_access(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
self.test_router_add_interface_subnet()
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_router_add_interface_port_with_metadata_access(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
self.test_router_add_interface_port()
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_router_add_interface_dupsubnet_returns_400_with_metadata(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
self.test_router_add_interface_dup_subnet1_returns_400()
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_router_add_interface_overlapped_cidr_returns_400_with(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
self.test_router_add_interface_overlapped_cidr_returns_400()
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_router_remove_interface_inuse_returns_409_with_metadata(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
self.test_router_remove_interface_inuse_returns_409()
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_router_remove_iface_wrong_sub_returns_400_with_metadata(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
self.test_router_remove_interface_wrong_subnet_returns_400()
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_router_delete_with_metadata_access(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
self.test_router_delete()
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_router_delete_with_port_existed_returns_409_with_metadata(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
self.test_router_delete_with_port_existed_returns_409()
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_metadatata_network_created_with_router_interface_add(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
with self.router() as r:
with self.subnet() as s:
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_metadata_network_create_rollback_on_create_subnet_failure(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
with self.router() as r:
with self.subnet() as s:
# Raise a NeutronException (eg: NotFound)
r['router']['id'],
s['subnet']['id'],
None)
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_metadata_network_create_rollback_on_add_rtr_iface_failure(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
with self.router() as r:
with self.subnet() as s:
# Raise a NeutronException when adding metadata subnet
r['router']['id'],
s['subnet']['id'],
None)
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_metadata_network_removed_with_router_interface_remove(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
with self.router() as r:
with self.subnet() as s:
self._router_interface_action('add', r['router']['id'],
webob.exc.HTTPNotFound.code)
self._show('subnets', meta_sub_id,
webob.exc.HTTPNotFound.code)
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_metadata_network_remove_rollback_on_failure(self):
- self._nvp_metadata_setup()
+ self._metadata_setup()
with self.router() as r:
with self.subnet() as s:
self._router_interface_action('add', r['router']['id'],
webob.exc.HTTPOk.code)
self._show('ports', meta_port_id,
webob.exc.HTTPOk.code)
- self._nvp_metadata_teardown()
+ self._metadata_teardown()
def test_metadata_dhcp_host_route(self):
cfg.CONF.set_override('metadata_mode', 'dhcp_host_route', 'NSX')
with mock.patch.object(
nsxlib.router,
'do_request',
- side_effect=nvp_exc.MaintenanceInProgress):
+ side_effect=nsx_exc.MaintenanceInProgress):
data = {'router': {'tenant_id': 'whatever'}}
data['router']['name'] = 'router1'
data['router']['external_gateway_info'] = {
res.status_int)
-class NvpQoSTestExtensionManager(object):
-
- def get_resources(self):
- return qos.Qos.get_resources()
-
- def get_actions(self):
- return []
-
- def get_request_extensions(self):
- return []
-
-
-class TestQoSQueue(NiciraPluginV2TestCase):
-
- def setUp(self, plugin=None):
- cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
- super(TestQoSQueue, self).setUp()
- ext_mgr = NvpQoSTestExtensionManager()
- self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
-
- def _create_qos_queue(self, fmt, body, **kwargs):
- qos_queue = self.new_create_request('qos-queues', body)
- if (kwargs.get('set_context') and 'tenant_id' in kwargs):
- # create a specific auth context for this request
- qos_queue.environ['neutron.context'] = context.Context(
- '', kwargs['tenant_id'])
-
- return qos_queue.get_response(self.ext_api)
-
- @contextlib.contextmanager
- def qos_queue(self, name='foo', min='0', max='10',
- qos_marking=None, dscp='0', default=None, no_delete=False):
-
- body = {'qos_queue': {'tenant_id': 'tenant',
- 'name': name,
- 'min': min,
- 'max': max}}
-
- if qos_marking:
- body['qos_queue']['qos_marking'] = qos_marking
- if dscp:
- body['qos_queue']['dscp'] = dscp
- if default:
- body['qos_queue']['default'] = default
- res = self._create_qos_queue('json', body)
- qos_queue = self.deserialize('json', res)
- if res.status_int >= 400:
- raise webob.exc.HTTPClientError(code=res.status_int)
- try:
- yield qos_queue
- finally:
- if not no_delete:
- self._delete('qos-queues',
- qos_queue['qos_queue']['id'])
-
- def test_create_qos_queue(self):
- with self.qos_queue(name='fake_lqueue', min=34, max=44,
- qos_marking='untrusted', default=False) as q:
- self.assertEqual(q['qos_queue']['name'], 'fake_lqueue')
- self.assertEqual(q['qos_queue']['min'], 34)
- self.assertEqual(q['qos_queue']['max'], 44)
- self.assertEqual(q['qos_queue']['qos_marking'], 'untrusted')
- self.assertFalse(q['qos_queue']['default'])
-
- def test_create_trusted_qos_queue(self):
- with mock.patch.object(qos_db.LOG, 'info') as log:
- with mock.patch.object(nsxlib.queue, 'do_request',
- return_value={"uuid": "fake_queue"}):
- with self.qos_queue(name='fake_lqueue', min=34, max=44,
- qos_marking='trusted', default=False) as q:
- self.assertIsNone(q['qos_queue']['dscp'])
- self.assertTrue(log.called)
-
- def test_create_qos_queue_name_exceeds_40_chars(self):
- name = 'this_is_a_queue_whose_name_is_longer_than_40_chars'
- with self.qos_queue(name=name) as queue:
- # Assert Neutron name is not truncated
- self.assertEqual(queue['qos_queue']['name'], name)
-
- def test_create_qos_queue_default(self):
- with self.qos_queue(default=True) as q:
- self.assertTrue(q['qos_queue']['default'])
-
- def test_create_qos_queue_two_default_queues_fail(self):
- with self.qos_queue(default=True):
- body = {'qos_queue': {'tenant_id': 'tenant',
- 'name': 'second_default_queue',
- 'default': True}}
- res = self._create_qos_queue('json', body)
- self.assertEqual(res.status_int, 409)
-
- def test_create_port_with_queue(self):
- with self.qos_queue(default=True) as q1:
- res = self._create_network('json', 'net1', True,
- arg_list=(qos.QUEUE,),
- queue_id=q1['qos_queue']['id'])
- net1 = self.deserialize('json', res)
- self.assertEqual(net1['network'][qos.QUEUE],
- q1['qos_queue']['id'])
- device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
- with self.port(device_id=device_id, do_delete=False) as p:
- self.assertEqual(len(p['port'][qos.QUEUE]), 36)
-
- def test_create_shared_queue_networks(self):
- with self.qos_queue(default=True, no_delete=True) as q1:
- res = self._create_network('json', 'net1', True,
- arg_list=(qos.QUEUE,),
- queue_id=q1['qos_queue']['id'])
- net1 = self.deserialize('json', res)
- self.assertEqual(net1['network'][qos.QUEUE],
- q1['qos_queue']['id'])
- res = self._create_network('json', 'net2', True,
- arg_list=(qos.QUEUE,),
- queue_id=q1['qos_queue']['id'])
- net2 = self.deserialize('json', res)
- self.assertEqual(net1['network'][qos.QUEUE],
- q1['qos_queue']['id'])
- device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
- res = self._create_port('json', net1['network']['id'],
- device_id=device_id)
- port1 = self.deserialize('json', res)
- res = self._create_port('json', net2['network']['id'],
- device_id=device_id)
- port2 = self.deserialize('json', res)
- self.assertEqual(port1['port'][qos.QUEUE],
- port2['port'][qos.QUEUE])
-
- self._delete('ports', port1['port']['id'])
- self._delete('ports', port2['port']['id'])
-
- def test_remove_queue_in_use_fail(self):
- with self.qos_queue(no_delete=True) as q1:
- res = self._create_network('json', 'net1', True,
- arg_list=(qos.QUEUE,),
- queue_id=q1['qos_queue']['id'])
- net1 = self.deserialize('json', res)
- device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
- res = self._create_port('json', net1['network']['id'],
- device_id=device_id)
- port = self.deserialize('json', res)
- self._delete('qos-queues', port['port'][qos.QUEUE], 409)
-
- def test_update_network_new_queue(self):
- with self.qos_queue() as q1:
- res = self._create_network('json', 'net1', True,
- arg_list=(qos.QUEUE,),
- queue_id=q1['qos_queue']['id'])
- net1 = self.deserialize('json', res)
- with self.qos_queue() as new_q:
- data = {'network': {qos.QUEUE: new_q['qos_queue']['id']}}
- req = self.new_update_request('networks', data,
- net1['network']['id'])
- res = req.get_response(self.api)
- net1 = self.deserialize('json', res)
- self.assertEqual(net1['network'][qos.QUEUE],
- new_q['qos_queue']['id'])
-
- def test_update_port_adding_device_id(self):
- with self.qos_queue(no_delete=True) as q1:
- res = self._create_network('json', 'net1', True,
- arg_list=(qos.QUEUE,),
- queue_id=q1['qos_queue']['id'])
- net1 = self.deserialize('json', res)
- device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
- res = self._create_port('json', net1['network']['id'])
- port = self.deserialize('json', res)
- self.assertIsNone(port['port'][qos.QUEUE])
-
- data = {'port': {'device_id': device_id}}
- req = self.new_update_request('ports', data,
- port['port']['id'])
-
- res = req.get_response(self.api)
- port = self.deserialize('json', res)
- self.assertEqual(len(port['port'][qos.QUEUE]), 36)
-
- def test_get_port_with_qos_not_admin(self):
- body = {'qos_queue': {'tenant_id': 'not_admin',
- 'name': 'foo', 'min': 20, 'max': 20}}
- res = self._create_qos_queue('json', body, tenant_id='not_admin')
- q1 = self.deserialize('json', res)
- res = self._create_network('json', 'net1', True,
- arg_list=(qos.QUEUE, 'tenant_id',),
- queue_id=q1['qos_queue']['id'],
- tenant_id="not_admin")
- net1 = self.deserialize('json', res)
- self.assertEqual(len(net1['network'][qos.QUEUE]), 36)
- res = self._create_port('json', net1['network']['id'],
- tenant_id='not_admin', set_context=True)
-
- port = self.deserialize('json', res)
- self.assertNotIn(qos.QUEUE, port['port'])
-
- def test_dscp_value_out_of_range(self):
- body = {'qos_queue': {'tenant_id': 'admin', 'dscp': '64',
- 'name': 'foo', 'min': 20, 'max': 20}}
- res = self._create_qos_queue('json', body)
- self.assertEqual(res.status_int, 400)
-
- def test_non_admin_cannot_create_queue(self):
- body = {'qos_queue': {'tenant_id': 'not_admin',
- 'name': 'foo', 'min': 20, 'max': 20}}
- res = self._create_qos_queue('json', body, tenant_id='not_admin',
- set_context=True)
- self.assertEqual(res.status_int, 403)
-
- def test_update_port_non_admin_does_not_show_queue_id(self):
- body = {'qos_queue': {'tenant_id': 'not_admin',
- 'name': 'foo', 'min': 20, 'max': 20}}
- res = self._create_qos_queue('json', body, tenant_id='not_admin')
- q1 = self.deserialize('json', res)
- res = self._create_network('json', 'net1', True,
- arg_list=(qos.QUEUE,),
- tenant_id='not_admin',
- queue_id=q1['qos_queue']['id'])
-
- net1 = self.deserialize('json', res)
- res = self._create_port('json', net1['network']['id'],
- tenant_id='not_admin', set_context=True)
- port = self.deserialize('json', res)
- device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
- data = {'port': {'device_id': device_id}}
- neutron_context = context.Context('', 'not_admin')
- port = self._update('ports', port['port']['id'], data,
- neutron_context=neutron_context)
- self.assertNotIn(qos.QUEUE, port['port'])
-
- def test_rxtx_factor(self):
- with self.qos_queue(max=10) as q1:
-
- res = self._create_network('json', 'net1', True,
- arg_list=(qos.QUEUE,),
- queue_id=q1['qos_queue']['id'])
- net1 = self.deserialize('json', res)
- res = self._create_port('json', net1['network']['id'],
- arg_list=(qos.RXTX_FACTOR,),
- rxtx_factor=2, device_id='1')
- port = self.deserialize('json', res)
- req = self.new_show_request('qos-queues',
- port['port'][qos.QUEUE])
- res = req.get_response(self.ext_api)
- queue = self.deserialize('json', res)
- self.assertEqual(queue['qos_queue']['max'], 20)
-
-
-class NiciraExtGwModeTestCase(NiciraPluginV2TestCase,
- test_ext_gw_mode.ExtGwModeIntTestCase):
+class ExtGwModeTestCase(NsxPluginV2TestCase,
+ test_ext_gw_mode.ExtGwModeIntTestCase):
pass
-class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase,
- test_l3_plugin.L3NatTestCaseMixin,
- ext_sg.SecurityGroupsTestCase):
+class NeutronNsxOutOfSync(NsxPluginV2TestCase,
+ test_l3_plugin.L3NatTestCaseMixin,
+ ext_sg.SecurityGroupsTestCase):
def setUp(self):
l3_attribute_map_bk = backup_l3_attribute_map()
self.addCleanup(restore_l3_attribute_map, l3_attribute_map_bk)
- super(NiciraNeutronNVPOutOfSync, self).setUp(
- ext_mgr=TestNiciraL3SecGrpExtensionManager())
+ super(NeutronNsxOutOfSync, self).setUp(
+ ext_mgr=TestL3SecGrpExtensionManager())
- def test_delete_network_not_in_nvp(self):
+ def test_delete_network_not_in_nsx(self):
res = self._create_network('json', 'net1', True)
net1 = self.deserialize('json', res)
self.fc._fake_lswitch_dict.clear()
res = req.get_response(self.api)
self.assertEqual(res.status_int, 204)
- def test_show_network_not_in_nvp(self):
+ def test_show_network_not_in_nsx(self):
res = self._create_network('json', 'net1', True)
net = self.deserialize('json', res)
self.fc._fake_lswitch_dict.clear()
self.assertEqual(net['network']['status'],
constants.NET_STATUS_ERROR)
- def test_delete_port_not_in_nvp(self):
+ def test_delete_port_not_in_nsx(self):
res = self._create_network('json', 'net1', True)
net1 = self.deserialize('json', res)
res = self._create_port('json', net1['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 204)
- def test_show_port_not_in_nvp(self):
+ def test_show_port_not_in_nsx(self):
res = self._create_network('json', 'net1', True)
net1 = self.deserialize('json', res)
res = self._create_port('json', net1['network']['id'])
self.assertEqual(net['port']['status'],
constants.PORT_STATUS_ERROR)
- def test_create_port_on_network_not_in_nvp(self):
+ def test_create_port_on_network_not_in_nsx(self):
res = self._create_network('json', 'net1', True)
net1 = self.deserialize('json', res)
self.fc._fake_lswitch_dict.clear()
port = self.deserialize('json', res)
self.assertEqual(port['port']['status'], constants.PORT_STATUS_ERROR)
- def test_update_port_not_in_nvp(self):
+ def test_update_port_not_in_nsx(self):
res = self._create_network('json', 'net1', True)
net1 = self.deserialize('json', res)
res = self._create_port('json', net1['network']['id'])
self.assertEqual(port['port']['status'], constants.PORT_STATUS_ERROR)
self.assertEqual(port['port']['name'], 'error_port')
- def test_delete_port_and_network_not_in_nvp(self):
+ def test_delete_port_and_network_not_in_nsx(self):
res = self._create_network('json', 'net1', True)
net1 = self.deserialize('json', res)
res = self._create_port('json', net1['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 204)
- def test_delete_router_not_in_nvp(self):
+ def test_delete_router_not_in_nsx(self):
res = self._create_router('json', 'tenant')
router = self.deserialize('json', res)
self.fc._fake_lrouter_dict.clear()
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
- def test_show_router_not_in_nvp(self):
+ def test_show_router_not_in_nsx(self):
res = self._create_router('json', 'tenant')
router = self.deserialize('json', res)
self.fc._fake_lrouter_dict.clear()
sub = self.deserialize('json', sub_res)
return net_id, sub['subnet']['id']
- def test_clear_gateway_nat_rule_not_in_nvp(self):
+ def test_clear_gateway_nat_rule_not_in_nsx(self):
# Create external network and subnet
ext_net_id = self._create_network_and_subnet('1.1.1.0/24', True)[0]
# Create internal network and subnet
router['router']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 200)
- # Delete NAT rule from NVP, clear gateway
+ # Delete NAT rule from NSX, clear gateway
# and verify operation still succeeds
self.fc._fake_lrouter_nat_dict.clear()
req = self.new_update_request(
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 200)
- def test_update_router_not_in_nvp(self):
+ def test_update_router_not_in_nsx(self):
res = self._create_router('json', 'tenant')
router = self.deserialize('json', res)
self.fc._fake_lrouter_dict.clear()
self.assertEqual(router['router']['status'],
constants.NET_STATUS_ERROR)
- def test_delete_security_group_not_in_nvp(self):
+ def test_delete_security_group_not_in_nsx(self):
res = self._create_security_group('json', 'name', 'desc')
sec_group = self.deserialize('json', res)
self.fc._fake_securityprofile_dict.clear()
sec_group['security_group']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
-
-
-class TestNiciraNetworkGateway(NiciraPluginV2TestCase,
- test_l2_gw.NetworkGatewayDbTestCase):
-
- def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
- cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
- super(TestNiciraNetworkGateway,
- self).setUp(plugin=plugin, ext_mgr=ext_mgr)
-
- def test_create_network_gateway_name_exceeds_40_chars(self):
- name = 'this_is_a_gateway_whose_name_is_longer_than_40_chars'
- with self._network_gateway(name=name) as nw_gw:
- # Assert Neutron name is not truncated
- self.assertEqual(nw_gw[self.resource]['name'], name)
-
- def test_update_network_gateway_with_name_calls_backend(self):
- with mock.patch.object(
- nsxlib.l2gateway, 'update_l2_gw_service') as mock_update_gw:
- with self._network_gateway(name='cavani') as nw_gw:
- nw_gw_id = nw_gw[self.resource]['id']
- self._update(networkgw.COLLECTION_NAME, nw_gw_id,
- {self.resource: {'name': 'higuain'}})
- mock_update_gw.assert_called_once_with(
- mock.ANY, nw_gw_id, 'higuain')
-
- def test_update_network_gateway_without_name_does_not_call_backend(self):
- with mock.patch.object(
- nsxlib.l2gateway, 'update_l2_gw_service') as mock_update_gw:
- with self._network_gateway(name='something') as nw_gw:
- nw_gw_id = nw_gw[self.resource]['id']
- self._update(networkgw.COLLECTION_NAME, nw_gw_id,
- {self.resource: {}})
- self.assertEqual(mock_update_gw.call_count, 0)
-
- def test_update_network_gateway_name_exceeds_40_chars(self):
- new_name = 'this_is_a_gateway_whose_name_is_longer_than_40_chars'
- with self._network_gateway(name='something') as nw_gw:
- nw_gw_id = nw_gw[self.resource]['id']
- self._update(networkgw.COLLECTION_NAME, nw_gw_id,
- {self.resource: {'name': new_name}})
- req = self.new_show_request(networkgw.COLLECTION_NAME,
- nw_gw_id)
- res = self.deserialize('json', req.get_response(self.ext_api))
- # Assert Neutron name is not truncated
- self.assertEqual(new_name, res[self.resource]['name'])
- # Assert NVP name is truncated
- self.assertEqual(
- new_name[:40],
- self.fc._fake_gatewayservice_dict[nw_gw_id]['display_name'])
-
- def test_create_network_gateway_nvp_error_returns_500(self):
- def raise_nvp_api_exc(*args, **kwargs):
- raise NvpApiClient.NvpApiException
-
- with mock.patch.object(nsxlib.l2gateway,
- 'create_l2_gw_service',
- new=raise_nvp_api_exc):
- res = self._create_network_gateway(
- self.fmt, 'xxx', name='yyy',
- devices=[{'id': uuidutils.generate_uuid()}])
- self.assertEqual(500, res.status_int)
-
- def test_create_network_gateway_nvp_error_returns_409(self):
- with mock.patch.object(nsxlib.l2gateway,
- 'create_l2_gw_service',
- side_effect=NvpApiClient.Conflict):
- res = self._create_network_gateway(
- self.fmt, 'xxx', name='yyy',
- devices=[{'id': uuidutils.generate_uuid()}])
- self.assertEqual(409, res.status_int)
-
- def test_list_network_gateways(self):
- with self._network_gateway(name='test-gw-1') as gw1:
- with self._network_gateway(name='test_gw_2') as gw2:
- req = self.new_list_request(networkgw.COLLECTION_NAME)
- res = self.deserialize('json', req.get_response(self.ext_api))
- # We expect the default gateway too
- key = self.resource + 's'
- self.assertEqual(len(res[key]), 3)
- self.assertEqual(res[key][0]['default'],
- True)
- self.assertEqual(res[key][1]['name'],
- gw1[self.resource]['name'])
- self.assertEqual(res[key][2]['name'],
- gw2[self.resource]['name'])
-
- def test_list_network_gateway_with_multiple_connections(self):
- self._test_list_network_gateway_with_multiple_connections(
- expected_gateways=2)
-
- def test_delete_network_gateway(self):
- # The default gateway must still be there
- self._test_delete_network_gateway(1)
-
- def test_show_network_gateway_nvp_error_returns_404(self):
- invalid_id = 'b5afd4a9-eb71-4af7-a082-8fc625a35b61'
- req = self.new_show_request(networkgw.COLLECTION_NAME, invalid_id)
- res = req.get_response(self.ext_api)
- self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
-
-
-class TestNiciraMultiProviderNetworks(NiciraPluginV2TestCase):
-
- def setUp(self, plugin=None):
- cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
- super(TestNiciraMultiProviderNetworks, self).setUp()
-
- def test_create_network_provider(self):
- data = {'network': {'name': 'net1',
- pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1,
- 'tenant_id': 'tenant_one'}}
- network_req = self.new_create_request('networks', data)
- network = self.deserialize(self.fmt,
- network_req.get_response(self.api))
- self.assertEqual(network['network'][pnet.NETWORK_TYPE], 'vlan')
- self.assertEqual(network['network'][pnet.PHYSICAL_NETWORK], 'physnet1')
- self.assertEqual(network['network'][pnet.SEGMENTATION_ID], 1)
- self.assertNotIn(mpnet.SEGMENTS, network['network'])
-
- def test_create_network_single_multiple_provider(self):
- data = {'network': {'name': 'net1',
- mpnet.SEGMENTS:
- [{pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1}],
- 'tenant_id': 'tenant_one'}}
- net_req = self.new_create_request('networks', data)
- network = self.deserialize(self.fmt, net_req.get_response(self.api))
- for provider_field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
- pnet.SEGMENTATION_ID]:
- self.assertNotIn(provider_field, network['network'])
- tz = network['network'][mpnet.SEGMENTS][0]
- self.assertEqual(tz[pnet.NETWORK_TYPE], 'vlan')
- self.assertEqual(tz[pnet.PHYSICAL_NETWORK], 'physnet1')
- self.assertEqual(tz[pnet.SEGMENTATION_ID], 1)
-
- # Tests get_network()
- net_req = self.new_show_request('networks', network['network']['id'])
- network = self.deserialize(self.fmt, net_req.get_response(self.api))
- tz = network['network'][mpnet.SEGMENTS][0]
- self.assertEqual(tz[pnet.NETWORK_TYPE], 'vlan')
- self.assertEqual(tz[pnet.PHYSICAL_NETWORK], 'physnet1')
- self.assertEqual(tz[pnet.SEGMENTATION_ID], 1)
-
- def test_create_network_multprovider(self):
- data = {'network': {'name': 'net1',
- mpnet.SEGMENTS:
- [{pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1},
- {pnet.NETWORK_TYPE: 'stt',
- pnet.PHYSICAL_NETWORK: 'physnet1'}],
- 'tenant_id': 'tenant_one'}}
- network_req = self.new_create_request('networks', data)
- network = self.deserialize(self.fmt,
- network_req.get_response(self.api))
- tz = network['network'][mpnet.SEGMENTS]
- for tz in data['network'][mpnet.SEGMENTS]:
- for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
- pnet.SEGMENTATION_ID]:
- self.assertEqual(tz.get(field), tz.get(field))
-
- # Tests get_network()
- net_req = self.new_show_request('networks', network['network']['id'])
- network = self.deserialize(self.fmt, net_req.get_response(self.api))
- tz = network['network'][mpnet.SEGMENTS]
- for tz in data['network'][mpnet.SEGMENTS]:
- for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
- pnet.SEGMENTATION_ID]:
- self.assertEqual(tz.get(field), tz.get(field))
-
- def test_create_network_with_provider_and_multiprovider_fail(self):
- data = {'network': {'name': 'net1',
- mpnet.SEGMENTS:
- [{pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1}],
- pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1,
- 'tenant_id': 'tenant_one'}}
-
- network_req = self.new_create_request('networks', data)
- res = network_req.get_response(self.api)
- self.assertEqual(res.status_int, 400)
-
- def test_create_network_duplicate_segments(self):
- data = {'network': {'name': 'net1',
- mpnet.SEGMENTS:
- [{pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1},
- {pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1}],
- 'tenant_id': 'tenant_one'}}
- network_req = self.new_create_request('networks', data)
- res = network_req.get_response(self.api)
- self.assertEqual(res.status_int, 400)
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright 2013 Nicira Networks, Inc.
+# Copyright 2013 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import log
from neutron.plugins.nicira.common import sync
-from neutron.plugins.nicira import nsx_cluster
+from neutron.plugins.nicira import nsx_cluster as cluster
from neutron.plugins.nicira import NvpApiClient
-from neutron.plugins.nicira import nvplib
+from neutron.plugins.nicira import nvplib as nsx_utils
from neutron.plugins.vmware import plugin
from neutron.tests import base
from neutron.tests.unit import test_api_v2
-from neutron.tests.unit.vmware import fake_nvpapiclient
+from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import STUBS_PATH
{'uuid': _uuid(), 'name': 'lr-2'}]
-class NvpCacheTestCase(base.BaseTestCase):
- """Test suite providing coverage for the NvpCache class."""
+class CacheTestCase(base.BaseTestCase):
+ """Test suite providing coverage for the Cache class."""
def setUp(self):
- self.nvp_cache = sync.NvpCache()
+ self.nsx_cache = sync.NvpCache()
for lswitch in LSWITCHES:
- self.nvp_cache._uuid_dict_mappings[lswitch['uuid']] = (
- self.nvp_cache._lswitches)
- self.nvp_cache._lswitches[lswitch['uuid']] = (
+ self.nsx_cache._uuid_dict_mappings[lswitch['uuid']] = (
+ self.nsx_cache._lswitches)
+ self.nsx_cache._lswitches[lswitch['uuid']] = (
{'data': lswitch,
'hash': hash(json.dumps(lswitch))})
for lswitchport in LSWITCHPORTS:
- self.nvp_cache._uuid_dict_mappings[lswitchport['uuid']] = (
- self.nvp_cache._lswitchports)
- self.nvp_cache._lswitchports[lswitchport['uuid']] = (
+ self.nsx_cache._uuid_dict_mappings[lswitchport['uuid']] = (
+ self.nsx_cache._lswitchports)
+ self.nsx_cache._lswitchports[lswitchport['uuid']] = (
{'data': lswitchport,
'hash': hash(json.dumps(lswitchport))})
for lrouter in LROUTERS:
- self.nvp_cache._uuid_dict_mappings[lrouter['uuid']] = (
- self.nvp_cache._lrouters)
- self.nvp_cache._lrouters[lrouter['uuid']] = (
+ self.nsx_cache._uuid_dict_mappings[lrouter['uuid']] = (
+ self.nsx_cache._lrouters)
+ self.nsx_cache._lrouters[lrouter['uuid']] = (
{'data': lrouter,
'hash': hash(json.dumps(lrouter))})
- super(NvpCacheTestCase, self).setUp()
+ super(CacheTestCase, self).setUp()
def test_get_lswitches(self):
- ls_uuids = self.nvp_cache.get_lswitches()
+ ls_uuids = self.nsx_cache.get_lswitches()
self.assertEqual(set(ls_uuids),
set([ls['uuid'] for ls in LSWITCHES]))
def test_get_lswitchports(self):
- lp_uuids = self.nvp_cache.get_lswitchports()
+ lp_uuids = self.nsx_cache.get_lswitchports()
self.assertEqual(set(lp_uuids),
set([lp['uuid'] for lp in LSWITCHPORTS]))
def test_get_lrouters(self):
- lr_uuids = self.nvp_cache.get_lrouters()
+ lr_uuids = self.nsx_cache.get_lrouters()
self.assertEqual(set(lr_uuids),
set([lr['uuid'] for lr in LROUTERS]))
def test_get_lswitches_changed_only(self):
- ls_uuids = self.nvp_cache.get_lswitches(changed_only=True)
+ ls_uuids = self.nsx_cache.get_lswitches(changed_only=True)
self.assertEqual(0, len(ls_uuids))
def test_get_lswitchports_changed_only(self):
- lp_uuids = self.nvp_cache.get_lswitchports(changed_only=True)
+ lp_uuids = self.nsx_cache.get_lswitchports(changed_only=True)
self.assertEqual(0, len(lp_uuids))
def test_get_lrouters_changed_only(self):
- lr_uuids = self.nvp_cache.get_lrouters(changed_only=True)
+ lr_uuids = self.nsx_cache.get_lrouters(changed_only=True)
self.assertEqual(0, len(lr_uuids))
def _verify_update(self, new_resource, changed=True, hit=True):
- cached_resource = self.nvp_cache[new_resource['uuid']]
+ cached_resource = self.nsx_cache[new_resource['uuid']]
self.assertEqual(new_resource, cached_resource['data'])
self.assertEqual(hit, cached_resource.get('hit', False))
self.assertEqual(changed,
def test_update_lswitch_new_item(self):
new_switch_uuid = _uuid()
new_switch = {'uuid': new_switch_uuid, 'name': 'new_switch'}
- self.nvp_cache.update_lswitch(new_switch)
- self.assertIn(new_switch_uuid, self.nvp_cache._lswitches.keys())
+ self.nsx_cache.update_lswitch(new_switch)
+ self.assertIn(new_switch_uuid, self.nsx_cache._lswitches.keys())
self._verify_update(new_switch)
def test_update_lswitch_existing_item(self):
switch = LSWITCHES[0]
switch['name'] = 'new_name'
- self.nvp_cache.update_lswitch(switch)
- self.assertIn(switch['uuid'], self.nvp_cache._lswitches.keys())
+ self.nsx_cache.update_lswitch(switch)
+ self.assertIn(switch['uuid'], self.nsx_cache._lswitches.keys())
self._verify_update(switch)
def test_update_lswitchport_new_item(self):
new_switchport_uuid = _uuid()
new_switchport = {'uuid': new_switchport_uuid,
'name': 'new_switchport'}
- self.nvp_cache.update_lswitchport(new_switchport)
+ self.nsx_cache.update_lswitchport(new_switchport)
self.assertIn(new_switchport_uuid,
- self.nvp_cache._lswitchports.keys())
+ self.nsx_cache._lswitchports.keys())
self._verify_update(new_switchport)
def test_update_lswitchport_existing_item(self):
switchport = LSWITCHPORTS[0]
switchport['name'] = 'new_name'
- self.nvp_cache.update_lswitchport(switchport)
+ self.nsx_cache.update_lswitchport(switchport)
self.assertIn(switchport['uuid'],
- self.nvp_cache._lswitchports.keys())
+ self.nsx_cache._lswitchports.keys())
self._verify_update(switchport)
def test_update_lrouter_new_item(self):
new_router_uuid = _uuid()
new_router = {'uuid': new_router_uuid,
'name': 'new_router'}
- self.nvp_cache.update_lrouter(new_router)
+ self.nsx_cache.update_lrouter(new_router)
self.assertIn(new_router_uuid,
- self.nvp_cache._lrouters.keys())
+ self.nsx_cache._lrouters.keys())
self._verify_update(new_router)
def test_update_lrouter_existing_item(self):
router = LROUTERS[0]
router['name'] = 'new_name'
- self.nvp_cache.update_lrouter(router)
+ self.nsx_cache.update_lrouter(router)
self.assertIn(router['uuid'],
- self.nvp_cache._lrouters.keys())
+ self.nsx_cache._lrouters.keys())
self._verify_update(router)
def test_process_updates_initial(self):
# Clear cache content to simulate first-time filling
- self.nvp_cache._lswitches.clear()
- self.nvp_cache._lswitchports.clear()
- self.nvp_cache._lrouters.clear()
- self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
+ self.nsx_cache._lswitches.clear()
+ self.nsx_cache._lswitchports.clear()
+ self.nsx_cache._lrouters.clear()
+ self.nsx_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
self._verify_update(resource)
def test_process_updates_no_change(self):
- self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
+ self.nsx_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
self._verify_update(resource, changed=False)
def test_process_updates_with_changes(self):
LSWITCHES[0]['name'] = 'altered'
- self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
+ self.nsx_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
changed = (True if resource['uuid'] == LSWITCHES[0]['uuid']
else False)
def _test_process_updates_with_removals(self):
lswitches = LSWITCHES[:]
lswitch = lswitches.pop()
- self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
+ self.nsx_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
hit = (False if resource['uuid'] == lswitch['uuid']
else True)
def test_process_updates_cleanup_after_delete(self):
deleted_lswitch, lswitches = self._test_process_updates_with_removals()
- self.nvp_cache.process_deletes()
- self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
- self.assertNotIn(deleted_lswitch['uuid'], self.nvp_cache._lswitches)
+ self.nsx_cache.process_deletes()
+ self.nsx_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
+ self.assertNotIn(deleted_lswitch['uuid'], self.nsx_cache._lswitches)
def _verify_delete(self, resource, deleted=True, hit=True):
- cached_resource = self.nvp_cache[resource['uuid']]
+ cached_resource = self.nsx_cache[resource['uuid']]
data_field = 'data_bk' if deleted else 'data'
self.assertEqual(resource, cached_resource[data_field])
self.assertEqual(hit, cached_resource.get('hit', False))
def test_process_deletes_no_change(self):
# Mark all resources as hit
- self._set_hit(self.nvp_cache._lswitches.values())
- self._set_hit(self.nvp_cache._lswitchports.values())
- self._set_hit(self.nvp_cache._lrouters.values())
- self.nvp_cache.process_deletes()
+ self._set_hit(self.nsx_cache._lswitches.values())
+ self._set_hit(self.nsx_cache._lswitchports.values())
+ self._set_hit(self.nsx_cache._lrouters.values())
+ self.nsx_cache.process_deletes()
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
self._verify_delete(resource, hit=False, deleted=False)
def test_process_deletes_with_removals(self):
# Mark all resources but one as hit
uuid_to_delete = LSWITCHPORTS[0]['uuid']
- self._set_hit(self.nvp_cache._lswitches.values(),
+ self._set_hit(self.nsx_cache._lswitches.values(),
uuid_to_delete)
- self._set_hit(self.nvp_cache._lswitchports.values(),
+ self._set_hit(self.nsx_cache._lswitchports.values(),
uuid_to_delete)
- self._set_hit(self.nvp_cache._lrouters.values(),
+ self._set_hit(self.nsx_cache._lrouters.values(),
uuid_to_delete)
- self.nvp_cache.process_deletes()
+ self.nsx_cache.process_deletes()
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
deleted = resource['uuid'] == uuid_to_delete
self._verify_delete(resource, hit=False, deleted=deleted)
self.assertTrue(synchronizer._synchronize_state.call_count)
-class NvpSyncTestCase(base.BaseTestCase):
+class SyncTestCase(base.BaseTestCase):
def setUp(self):
- # mock nvp api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
- mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
+ # mock api client
+ self.fc = fake.FakeClient(STUBS_PATH)
+ mock_api = mock.patch(NSXAPI_NAME, autospec=True)
# Avoid runs of the synchronizer looping call
# These unit tests will excplicitly invoke synchronization
patch_sync = mock.patch.object(sync, '_start_loopingcall')
- self.mock_nvpapi = mock_nvpapi.start()
+ self.mock_api = mock_api.start()
patch_sync.start()
- self.mock_nvpapi.return_value.login.return_value = "the_cookie"
- # Emulate tests against NVP 3.x
- self.mock_nvpapi.return_value.get_nvp_version.return_value = (
+ self.mock_api.return_value.login.return_value = "the_cookie"
+ # Emulate tests against NSX 3.x
+ self.mock_api.return_value.get_nvp_version.return_value = (
NvpApiClient.NVPVersion("3.1"))
- def _fake_request(*args, **kwargs):
- return self.fc.fake_request(*args, **kwargs)
-
- self.mock_nvpapi.return_value.request.side_effect = _fake_request
- self.fake_cluster = nsx_cluster.NSXCluster(
+ self.mock_api.return_value.request.side_effect = self.fc.fake_request
+ self.fake_cluster = cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
self.mock_nm_get_plugin = mock_nm_get_plugin.start()
self.mock_nm_get_plugin.return_value = self._plugin
mock_nm_get_service_plugins.start()
- super(NvpSyncTestCase, self).setUp()
+ super(SyncTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(patch_sync.stop)
- self.addCleanup(mock_nvpapi.stop)
+ self.addCleanup(mock_api.stop)
self.addCleanup(mock_nm_get_plugin.stop)
self.addCleanup(mock_nm_get_service_plugins.stop)
def tearDown(self):
cfg.CONF.reset()
- super(NvpSyncTestCase, self).tearDown()
+ super(SyncTestCase, self).tearDown()
@contextlib.contextmanager
def _populate_data(self, ctx, net_size=2, port_size=2, router_size=2):
def _test_sync_with_chunk_larger_maxpagesize(
self, net_size, port_size, router_size, chunk_size, exp_calls):
ctx = context.get_admin_context()
- real_func = nvplib.get_single_query_page
+ real_func = nsx_utils.get_single_query_page
sp = sync.SyncParameters(chunk_size)
with self._populate_data(ctx, net_size=net_size,
port_size=port_size,
# The following mock is just for counting calls,
# but we will still run the actual function
with mock.patch.object(
- nvplib, 'get_single_query_page',
+ nsx_utils, 'get_single_query_page',
side_effect=real_func) as mock_get_page:
self._test_sync(
constants.NET_STATUS_ACTIVE,
chunk_size=48, exp_calls=6)
def test_sync_multi_chunk(self):
- # The fake NVP API client cannot be used for this test
+ # The fake NSX API client cannot be used for this test
ctx = context.get_admin_context()
# Generate 4 networks, 1 port per network, and 4 routers
with self._populate_data(ctx, net_size=4, port_size=1, router_size=4):
q_rtr_data = self._plugin.get_router(ctx, q_rtr_id)
self.assertEqual(constants.NET_STATUS_DOWN, q_rtr_data['status'])
- def test_sync_nvp_failure_backoff(self):
- self.mock_nvpapi.return_value.request.side_effect = (
+ def test_sync_nsx_failure_backoff(self):
+ self.mock_api.return_value.request.side_effect = (
NvpApiClient.RequestTimeout)
# chunk size won't matter here
sp = sync.SyncParameters(999)
from neutron.db import api as db_api
from neutron.openstack.common import uuidutils
-from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import exceptions as nsx_exc
from neutron.plugins.nicira.common import nsx_utils
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NvpApiClient
with mock.patch.object(self.fake_cluster.api_client,
'request',
side_effect=NvpApiClient.ReadOnlyMode):
- self.assertRaises(nvp_exc.MaintenanceInProgress,
+ self.assertRaises(nsx_exc.MaintenanceInProgress,
nvplib.do_request, cluster=self.fake_cluster)
def test_cluster_method_not_implemented(self):
self._edge_idx = 0
self._lswitches = {}
self._unique_router_name = unique_router_name
- self._fake_nvpapi = None
+ self._fake_nsx_api = None
self.fake_firewall_dict = {}
self.temp_firewall = {
"firewallRules": {
self._fake_app_profiles_dict = {}
self._fake_loadbalancer_config = {}
- def set_fake_nvpapi(self, fake_nvpapi):
- self._fake_nvpapi = fake_nvpapi
+ def set_fake_nsx_api(self, fake_nsx_api):
+ self._fake_nsx_api = fake_nsx_api
def _validate_edge_name(self, name):
for edge_id, edge in self._edges.iteritems():
return (header, response)
def create_lswitch(self, lsconfig):
- # The lswitch is created via VCNS API so the fake nvpapi wont
- # see it. Added to fake nvpapi here.
- if self._fake_nvpapi:
- lswitch = self._fake_nvpapi._add_lswitch(json.dumps(lsconfig))
+ # The lswitch is created via VCNS API so the fake nsx_api will not
+ # see it. Added to fake nsx_api here.
+ if self._fake_nsx_api:
+ lswitch = self._fake_nsx_api._add_lswitch(json.dumps(lsconfig))
else:
lswitch = lsconfig
lswitch['uuid'] = uuidutils.generate_uuid()
if id not in self._lswitches:
raise Exception(_("Lswitch %s does not exist") % id)
del self._lswitches[id]
- if self._fake_nvpapi:
+ if self._fake_nsx_api:
# TODO(fank): fix the hack
- del self._fake_nvpapi._fake_lswitch_dict[id]
+ del self._fake_nsx_api._fake_lswitch_dict[id]
header = {
'status': 200
}
return []
-class ServiceRouterTest(test_nsx_plugin.NiciraL3NatTest,
+class ServiceRouterTest(test_nsx_plugin.L3NatTest,
test_l3_plugin.L3NatTestCaseMixin):
def vcns_patch(self):
service_plugins=service_plugins,
ext_mgr=ext_mgr)
- self.fc2.set_fake_nvpapi(self.fc)
+ self.fc2.set_fake_nsx_api(self.fc)
self.addCleanup(self.fc2.reset_all)
self.addCleanup(mock.patch.stopall)
class ServiceRouterTestCase(ServiceRouterTest,
- test_nsx_plugin.TestNiciraL3NatTestCase):
+ test_nsx_plugin.TestL3NatTestCase):
def test_router_create(self):
name = 'router1'
# License for the specific language governing permissions and limitations
# under the License.
#
-# @author: linb, VMware
import contextlib
import mock
self.mock_vcns = mock.patch(VCNS_NAME, autospec=True)
self.vcns_firewall_patch()
- self.nvp_service_plugin_callback = mock.Mock()
- self.driver = vcns_driver.VcnsDriver(self.nvp_service_plugin_callback)
+ self.driver = vcns_driver.VcnsDriver(mock.Mock())
super(VcnsDriverTestCase, self).setUp()
self.addCleanup(self.fc2.reset_all)
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as const
from neutron.tests.unit.db.firewall import test_db_firewall
-from neutron.tests.unit.vmware import test_edge_router
+from neutron.tests.unit.vmware.vshield import test_edge_router
_uuid = uuidutils.generate_uuid
# License for the specific language governing permissions and limitations
# under the License.
#
-# @author: linb, VMware
import contextlib
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
-from neutron.tests.unit.vmware import test_edge_router
+from neutron.tests.unit.vmware.vshield import test_edge_router
_uuid = uuidutils.generate_uuid
from neutron import context
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.dbexts import vcns_db
-from neutron.plugins.nicira.vshield import (
- vcns_driver)
-from neutron.plugins.nicira.vshield.common import (
- exceptions as vcns_exc)
+from neutron.plugins.nicira.vshield.common import exceptions as vcns_exc
+from neutron.plugins.nicira.vshield import vcns_driver
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import VCNS_NAME
self.mock_vcns = mock.patch(VCNS_NAME, autospec=True)
self.vcns_loadbalancer_patch()
- self.nvp_service_plugin_callback = mock.Mock()
- self.driver = vcns_driver.VcnsDriver(self.nvp_service_plugin_callback)
+ self.driver = vcns_driver.VcnsDriver(mock.Mock())
super(VcnsDriverTestCase, self).setUp()
self.addCleanup(self.fc2.reset_all)
import mock
from neutron.common import config as n_config
-from neutron.plugins.nicira.vshield.common import (
- constants as vcns_const)
+from neutron.plugins.nicira.vshield.common import constants as vcns_const
from neutron.plugins.nicira.vshield.common.constants import RouterStatus
from neutron.plugins.nicira.vshield.tasks.constants import TaskState
from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus