from neutron.common import config
from neutron.plugins.vmware.common import config as nsx_config # noqa
from neutron.plugins.vmware.common import nsx_utils
-from neutron.plugins.vmware import nvplib
+from neutron.plugins.vmware import nsxlib
config.setup_logging(cfg.CONF)
def config_helper(config_entity, cluster):
try:
- return nvplib.do_request('GET',
+ return nsxlib.do_request('GET',
"/ws.v1/%s?fields=uuid" % config_entity,
cluster=cluster).get('results', [])
except Exception as e:
def is_transport_node_connected(cluster, node_uuid):
try:
- return nvplib.do_request('GET',
+ return nsxlib.do_request('GET',
"/ws.v1/transport-node/%s/status" % node_uuid,
cluster=cluster)['connection']['connected']
except Exception as e:
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import nsx_utils
+from neutron.plugins.vmware import nsxlib
from neutron.plugins.vmware.nsxlib import router as routerlib
from neutron.plugins.vmware.nsxlib import switch as switchlib
-from neutron.plugins.vmware import nvplib
# Maximum page size for a single request
# NOTE(salv-orlando): This might become a version-dependent map should the
class NvpSynchronizer():
- LS_URI = nvplib._build_uri_path(
- nvplib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
+ LS_URI = nsxlib._build_uri_path(
+ switchlib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
relations='LogicalSwitchStatus')
- LR_URI = nvplib._build_uri_path(
+ LR_URI = nsxlib._build_uri_path(
routerlib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
relations='LogicalRouterStatus')
- LP_URI = nvplib._build_uri_path(
- nvplib.LSWITCHPORT_RESOURCE,
+ LP_URI = nsxlib._build_uri_path(
+ switchlib.LSWITCHPORT_RESOURCE,
parent_resource_id='*',
fields='uuid,tags,fabric_status_up',
relations='LogicalPortStatus')
'max_page_size': MAX_PAGE_SIZE})
# Only the first request might return the total size,
# subsequent requests will definetely not
- results, cursor, total_size = nvplib.get_single_query_page(
+ results, cursor, total_size = nsxlib.get_single_query_page(
uri, self._cluster, cursor,
min(page_size, MAX_PAGE_SIZE))
for _req in range(num_requests - 1):
# resource type is below this threshold
if not cursor:
break
- req_results, cursor = nvplib.get_single_query_page(
+ req_results, cursor = nsxlib.get_single_query_page(
uri, self._cluster, cursor,
min(page_size, MAX_PAGE_SIZE))[:2]
results.extend(req_results)
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2014 VMware, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from neutron.common import exceptions as exception
+from neutron.openstack.common import log
+from neutron.plugins.vmware.api_client import exception as api_exc
+from neutron.plugins.vmware.common import exceptions as nsx_exc
+from neutron.version import version_info
+
+HTTP_GET = "GET"
+HTTP_POST = "POST"
+HTTP_DELETE = "DELETE"
+HTTP_PUT = "PUT"
+# Prefix to be used for all NVP API calls
+URI_PREFIX = "/ws.v1"
+NEUTRON_VERSION = version_info.release_string()
+
+LOG = log.getLogger(__name__)
+
+
+def _build_uri_path(resource,
+ resource_id=None,
+ parent_resource_id=None,
+ fields=None,
+ relations=None,
+ filters=None,
+ types=None,
+ is_attachment=False,
+ extra_action=None):
+ resources = resource.split('/')
+ res_path = resources[0] + (resource_id and "/%s" % resource_id or '')
+ if len(resources) > 1:
+ # There is also a parent resource to account for in the uri
+ res_path = "%s/%s/%s" % (resources[1],
+ parent_resource_id,
+ res_path)
+ if is_attachment:
+ res_path = "%s/attachment" % res_path
+ elif extra_action:
+ res_path = "%s/%s" % (res_path, extra_action)
+ params = []
+ params.append(fields and "fields=%s" % fields)
+ params.append(relations and "relations=%s" % relations)
+ params.append(types and "types=%s" % types)
+ if filters:
+ params.extend(['%s=%s' % (k, v) for (k, v) in filters.iteritems()])
+ uri_path = "%s/%s" % (URI_PREFIX, res_path)
+ non_empty_params = [x for x in params if x is not None]
+ if non_empty_params:
+ query_string = '&'.join(non_empty_params)
+ if query_string:
+ uri_path += "?%s" % query_string
+ return uri_path
+
+
+def format_exception(etype, e, exception_locals):
+ """Consistent formatting for exceptions.
+
+ :param etype: a string describing the exception type.
+ :param e: the exception.
+ :param execption_locals: calling context local variable dict.
+ :returns: a formatted string.
+ """
+ msg = [_("Error. %(type)s exception: %(exc)s.") %
+ {'type': etype, 'exc': e}]
+ l = dict((k, v) for k, v in exception_locals.iteritems()
+ if k != 'request')
+ msg.append(_("locals=[%s]") % str(l))
+ return ' '.join(msg)
+
+
+def do_request(*args, **kwargs):
+ """Issue a request to the cluster specified in kwargs.
+
+ :param args: a list of positional arguments.
+ :param kwargs: a list of keyworkds arguments.
+ :returns: the result of the operation loaded into a python
+ object or None.
+ """
+ cluster = kwargs["cluster"]
+ try:
+ res = cluster.api_client.request(*args)
+ if res:
+ return json.loads(res)
+ except api_exc.ResourceNotFound:
+ raise exception.NotFound()
+ except api_exc.ReadOnlyMode:
+ raise nsx_exc.MaintenanceInProgress()
+
+
+def get_single_query_page(path, cluster, page_cursor=None,
+ page_length=1000, neutron_only=True):
+ params = []
+ if page_cursor:
+ params.append("_page_cursor=%s" % page_cursor)
+ params.append("_page_length=%s" % page_length)
+ # NOTE(salv-orlando): On the NSX backend the 'Quantum' tag is still
+ # used for marking Neutron entities in order to preserve compatibility
+ if neutron_only:
+ params.append("tag_scope=quantum")
+ query_params = "&".join(params)
+ path = "%s%s%s" % (path, "&" if (path.find("?") != -1) else "?",
+ query_params)
+ body = do_request(HTTP_GET, path, cluster=cluster)
+ # Result_count won't be returned if _page_cursor is supplied
+ return body['results'], body.get('page_cursor'), body.get('result_count')
+
+
+def get_all_query_pages(path, cluster):
+ need_more_results = True
+ result_list = []
+ page_cursor = None
+ while need_more_results:
+ results, page_cursor = get_single_query_page(
+ path, cluster, page_cursor)[:2]
+ if not page_cursor:
+ need_more_results = False
+ result_list.extend(results)
+ return result_list
+
+
+def mk_body(**kwargs):
+ """Convenience function creates and dumps dictionary to string.
+
+ :param kwargs: the key/value pirs to be dumped into a json string.
+ :returns: a json string.
+ """
+ return json.dumps(kwargs, ensure_ascii=False)
from neutron.openstack.common import log
from neutron.plugins.vmware.common import utils
+from neutron.plugins.vmware.nsxlib import _build_uri_path
+from neutron.plugins.vmware.nsxlib import do_request
+from neutron.plugins.vmware.nsxlib import get_all_query_pages
from neutron.plugins.vmware.nsxlib import switch
-from neutron.plugins.vmware.nvplib import _build_uri_path
-from neutron.plugins.vmware.nvplib import do_request
-from neutron.plugins.vmware.nvplib import get_all_query_pages
HTTP_GET = "GET"
HTTP_POST = "POST"
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nvplib import _build_uri_path
-from neutron.plugins.vmware.nvplib import do_request
+from neutron.plugins.vmware.nsxlib import _build_uri_path
+from neutron.plugins.vmware.nsxlib import do_request
HTTP_GET = "GET"
HTTP_POST = "POST"
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nvplib import _build_uri_path
-from neutron.plugins.vmware.nvplib import do_request
+from neutron.plugins.vmware.nsxlib import _build_uri_path
+from neutron.plugins.vmware.nsxlib import do_request
HTTP_POST = "POST"
HTTP_DELETE = "DELETE"
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
+from neutron.plugins.vmware.nsxlib import _build_uri_path
+from neutron.plugins.vmware.nsxlib import do_request
+from neutron.plugins.vmware.nsxlib import get_all_query_pages
from neutron.plugins.vmware.nsxlib.switch import get_port
from neutron.plugins.vmware.nsxlib.versioning import DEFAULT_VERSION
from neutron.plugins.vmware.nsxlib.versioning import versioned
-from neutron.plugins.vmware.nvplib import _build_uri_path
-from neutron.plugins.vmware.nvplib import do_request
-from neutron.plugins.vmware.nvplib import get_all_query_pages
HTTP_GET = "GET"
HTTP_POST = "POST"
from neutron.common import exceptions
from neutron.openstack.common import log
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nvplib import _build_uri_path
-from neutron.plugins.vmware.nvplib import do_request
-from neutron.plugins.vmware.nvplib import format_exception
-from neutron.plugins.vmware.nvplib import get_all_query_pages
+from neutron.plugins.vmware.nsxlib import _build_uri_path
+from neutron.plugins.vmware.nsxlib import do_request
+from neutron.plugins.vmware.nsxlib import format_exception
+from neutron.plugins.vmware.nsxlib import get_all_query_pages
HTTP_GET = "GET"
HTTP_POST = "POST"
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nvplib import _build_uri_path
-from neutron.plugins.vmware.nvplib import do_request
-from neutron.plugins.vmware.nvplib import get_all_query_pages
+from neutron.plugins.vmware.nsxlib import _build_uri_path
+from neutron.plugins.vmware.nsxlib import do_request
+from neutron.plugins.vmware.nsxlib import get_all_query_pages
HTTP_GET = "GET"
HTTP_POST = "POST"
+++ /dev/null
-# Copyright 2012 VMware, Inc.
-# All Rights Reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import json
-
-
-from neutron.common import exceptions as exception
-from neutron.openstack.common import log
-from neutron.plugins.vmware.api_client import exception as api_exc
-from neutron.plugins.vmware.common import exceptions as nsx_exc
-
-
-LOG = log.getLogger(__name__)
-# HTTP METHODS CONSTANTS
-HTTP_GET = "GET"
-HTTP_POST = "POST"
-HTTP_DELETE = "DELETE"
-HTTP_PUT = "PUT"
-# Prefix to be used for all NSX API calls
-URI_PREFIX = "/ws.v1"
-
-LSWITCH_RESOURCE = "lswitch"
-LSWITCHPORT_RESOURCE = "lport/%s" % LSWITCH_RESOURCE
-
-# Maximum page size for a single request
-# NOTE(salv-orlando): This might become a version-dependent map should the
-# limit be raised in future versions
-MAX_PAGE_SIZE = 5000
-
-
-def _build_uri_path(resource,
- resource_id=None,
- parent_resource_id=None,
- fields=None,
- relations=None,
- filters=None,
- types=None,
- is_attachment=False,
- extra_action=None):
- resources = resource.split('/')
- res_path = resources[0] + (resource_id and "/%s" % resource_id or '')
- if len(resources) > 1:
- # There is also a parent resource to account for in the uri
- res_path = "%s/%s/%s" % (resources[1],
- parent_resource_id,
- res_path)
- if is_attachment:
- res_path = "%s/attachment" % res_path
- elif extra_action:
- res_path = "%s/%s" % (res_path, extra_action)
- params = []
- params.append(fields and "fields=%s" % fields)
- params.append(relations and "relations=%s" % relations)
- params.append(types and "types=%s" % types)
- if filters:
- params.extend(['%s=%s' % (k, v) for (k, v) in filters.iteritems()])
- uri_path = "%s/%s" % (URI_PREFIX, res_path)
- non_empty_params = [x for x in params if x is not None]
- if non_empty_params:
- query_string = '&'.join(non_empty_params)
- if query_string:
- uri_path += "?%s" % query_string
- return uri_path
-
-
-def get_single_query_page(path, cluster, page_cursor=None,
- page_length=1000, neutron_only=True):
- params = []
- if page_cursor:
- params.append("_page_cursor=%s" % page_cursor)
- params.append("_page_length=%s" % page_length)
- # NOTE(salv-orlando): On the NSX backend the 'Quantum' tag is still
- # used for marking Neutron entities in order to preserve compatibility
- if neutron_only:
- params.append("tag_scope=quantum")
- query_params = "&".join(params)
- path = "%s%s%s" % (path, "&" if (path.find("?") != -1) else "?",
- query_params)
- body = do_request(HTTP_GET, path, cluster=cluster)
- # Result_count won't be returned if _page_cursor is supplied
- return body['results'], body.get('page_cursor'), body.get('result_count')
-
-
-def get_all_query_pages(path, c):
- need_more_results = True
- result_list = []
- page_cursor = None
- while need_more_results:
- results, page_cursor = get_single_query_page(
- path, c, page_cursor)[:2]
- if not page_cursor:
- need_more_results = False
- result_list.extend(results)
- return result_list
-
-
-def format_exception(etype, e, exception_locals):
- """Consistent formatting for exceptions.
-
- :param etype: a string describing the exception type.
- :param e: the exception.
- :param execption_locals: calling context local variable dict.
- :returns: a formatted string.
- """
- msg = [_("Error. %(type)s exception: %(exc)s.") %
- {'type': etype, 'exc': e}]
- l = dict((k, v) for k, v in exception_locals.iteritems()
- if k != 'request')
- msg.append(_("locals=[%s]") % str(l))
- return ' '.join(msg)
-
-
-def do_request(*args, **kwargs):
- """Issue a request to the cluster specified in kwargs.
-
- :param args: a list of positional arguments.
- :param kwargs: a list of keyworkds arguments.
- :returns: the result of the operation loaded into a python
- object or None.
- """
- cluster = kwargs["cluster"]
- try:
- res = cluster.api_client.request(*args)
- if res:
- return json.loads(res)
- except api_exc.ResourceNotFound:
- raise exception.NotFound()
- except api_exc.ReadOnlyMode:
- raise nsx_exc.MaintenanceInProgress()
from neutron.plugins.vmware.api_client import client as nsx_client
from neutron.plugins.vmware.api_client import eventlet_client
from neutron.plugins.vmware import extensions
-from neutron.plugins.vmware import nvplib
import neutron.plugins.vmware.plugin as neutron_plugin
from neutron.plugins.vmware.vshield.common import VcnsApiClient as vcnsapi
from neutron.plugins.vmware.vshield import vcns
STUBS_PATH = os.path.join(os.path.dirname(__file__), 'etc')
NSXEXT_PATH = os.path.dirname(extensions.__file__)
NSXAPI_NAME = '%s.%s' % (api_client.__module__, api_client.__name__)
-NSXLIB_NAME = nvplib.__name__
PLUGIN_NAME = '%s.%s' % (plugin.__module__, plugin.__name__)
SERVICE_PLUGIN_NAME = '%s.%s' % (service_plugin.__module__,
service_plugin.__name__)
return os.path.join(STUBS_PATH, filename)
-def nsx_method(method_name, module_name='nvplib'):
+def nsx_method(method_name, module_name='nsxlib'):
return '%s.%s.%s' % ('neutron.plugins.vmware', module_name, method_name)
#
from neutron.plugins.vmware.api_client import exception
+from neutron.plugins.vmware import nsxlib
from neutron.plugins.vmware.nsxlib import l2gateway as l2gwlib
from neutron.plugins.vmware.nsxlib import switch as switchlib
from neutron.tests.unit import test_api_v2
l2gwlib.plug_l2_gw_service(
self.fake_cluster, lswitch['uuid'],
lport['uuid'], gw_id)
- uri = l2gwlib._build_uri_path(switchlib.LSWITCHPORT_RESOURCE,
- lport['uuid'],
- lswitch['uuid'],
- is_attachment=True)
- resp_obj = l2gwlib.do_request("GET", uri,
- cluster=self.fake_cluster)
+ uri = nsxlib._build_uri_path(switchlib.LSWITCHPORT_RESOURCE,
+ lport['uuid'],
+ lswitch['uuid'],
+ is_attachment=True)
+ resp_obj = nsxlib.do_request("GET", uri,
+ cluster=self.fake_cluster)
self.assertIn('LogicalPortAttachment', resp_obj)
self.assertEqual(resp_obj['LogicalPortAttachment']['type'],
'L2GatewayAttachment')
from neutron.plugins.vmware.api_client.version import Version
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
+from neutron.plugins.vmware import nsxlib
from neutron.plugins.vmware.nsxlib import router as routerlib
from neutron.plugins.vmware.nsxlib import switch as switchlib
from neutron.tests.unit import test_api_v2
self.fake_cluster, lrouter['uuid'], '10.0.0.99',
match_criteria={'destination_ip_addresses':
'192.168.0.5'})
- uri = routerlib._build_uri_path(routerlib.LROUTERNAT_RESOURCE,
- nat_rule['uuid'],
- lrouter['uuid'])
- resp_obj = routerlib.do_request(
- "GET", uri, cluster=self.fake_cluster)
+ uri = nsxlib._build_uri_path(routerlib.LROUTERNAT_RESOURCE,
+ nat_rule['uuid'],
+ lrouter['uuid'])
+ resp_obj = nsxlib.do_request("GET", uri, cluster=self.fake_cluster)
self.assertEqual('DestinationNatRule', resp_obj['type'])
self.assertEqual('192.168.0.5',
resp_obj['match']['destination_ip_addresses'])
#
from neutron.common import exceptions
+from neutron.plugins.vmware import nsxlib
from neutron.plugins.vmware.nsxlib import secgroup as secgrouplib
-from neutron.plugins.vmware import nvplib as nsx_utils
from neutron.tests.unit import test_api_v2
from neutron.tests.unit.vmware.nsxlib import base
def test_create_and_get_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
- sec_prof_res = secgrouplib.do_request(
+ sec_prof_res = nsxlib.do_request(
secgrouplib.HTTP_GET,
- nsx_utils._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
+ nsxlib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
def test_create_and_get_default_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, _uuid(), 'pippo', {'name': 'default'})
- sec_prof_res = nsx_utils.do_request(
+ sec_prof_res = nsxlib.do_request(
secgrouplib.HTTP_GET,
- nsx_utils._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
+ nsxlib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
'logical_port_ingress_rules': [ingress_rule]}
secgrouplib.update_security_group_rules(
self.fake_cluster, sec_prof['uuid'], new_rules)
- sec_prof_res = nsx_utils.do_request(
- secgrouplib.HTTP_GET,
- nsx_utils._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
+ sec_prof_res = nsxlib.do_request(
+ nsxlib.HTTP_GET,
+ nsxlib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
'logical_port_ingress_rules': []}
secgrouplib.update_security_group_rules(
self.fake_cluster, sec_prof['uuid'], new_rules)
- sec_prof_res = nsx_utils.do_request(
- nsx_utils.HTTP_GET,
- nsx_utils._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
+ sec_prof_res = nsxlib.do_request(
+ nsxlib.HTTP_GET,
+ nsxlib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
secgrouplib.delete_security_profile(
self.fake_cluster, sec_prof['uuid'])
self.assertRaises(exceptions.NotFound,
- secgrouplib.do_request,
- secgrouplib.HTTP_GET,
- nsx_utils._build_uri_path(
+ nsxlib.do_request,
+ nsxlib.HTTP_GET,
+ nsxlib._build_uri_path(
'security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
from neutron.plugins.vmware.api_client import version
from neutron.plugins.vmware.common import sync
from neutron.plugins.vmware import nsx_cluster as cluster
-from neutron.plugins.vmware import nvplib as nsx_utils
+from neutron.plugins.vmware import nsxlib
from neutron.plugins.vmware import plugin
from neutron.tests import base
from neutron.tests.unit import test_api_v2
def _test_sync_with_chunk_larger_maxpagesize(
self, net_size, port_size, router_size, chunk_size, exp_calls):
ctx = context.get_admin_context()
- real_func = nsx_utils.get_single_query_page
+ real_func = nsxlib.get_single_query_page
sp = sync.SyncParameters(chunk_size)
with self._populate_data(ctx, net_size=net_size,
port_size=port_size,
# The following mock is just for counting calls,
# but we will still run the actual function
with mock.patch.object(
- nsx_utils, 'get_single_query_page',
+ nsxlib, 'get_single_query_page',
side_effect=real_func) as mock_get_page:
self._test_sync(
constants.NET_STATUS_ACTIVE,
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import nsx_utils
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware import nvplib
+from neutron.plugins.vmware import nsxlib
from neutron.tests import base
from neutron.tests.unit.vmware import nsx_method
from neutron.tests.unit.vmware.nsxlib import base as nsx_base
module_name='dbexts.db')).start()
def _verify_get_nsx_switch_and_port_id(self, exp_ls_uuid, exp_lp_uuid):
- # The nvplib and db calls are mocked, therefore the cluster
+ # The nsxlib and db calls are mocked, therefore the cluster
# and the neutron_port_id parameters can be set to None
ls_uuid, lp_uuid = nsx_utils.get_nsx_switch_and_port_id(
db_api.get_session(), None, None)
self.assertEqual(exp_lp_uuid, lp_uuid)
def _verify_get_nsx_switch_ids(self, exp_ls_uuids):
- # The nvplib and db calls are mocked, therefore the cluster
+ # The nsxlib and db calls are mocked, therefore the cluster
# and the neutron_router_id parameters can be set to None
ls_uuids = nsx_utils.get_nsx_switch_ids(
db_api.get_session(), None, None)
self.assertEqual(len(result), utils.MAX_DISPLAY_NAME_LEN)
def test_build_uri_path_plain(self):
- result = nvplib._build_uri_path('RESOURCE')
- self.assertEqual("%s/%s" % (nvplib.URI_PREFIX, 'RESOURCE'), result)
+ result = nsxlib._build_uri_path('RESOURCE')
+ self.assertEqual("%s/%s" % (nsxlib.URI_PREFIX, 'RESOURCE'), result)
def test_build_uri_path_with_field(self):
- result = nvplib._build_uri_path('RESOURCE', fields='uuid')
- expected = "%s/%s?fields=uuid" % (nvplib.URI_PREFIX, 'RESOURCE')
+ result = nsxlib._build_uri_path('RESOURCE', fields='uuid')
+ expected = "%s/%s?fields=uuid" % (nsxlib.URI_PREFIX, 'RESOURCE')
self.assertEqual(expected, result)
def test_build_uri_path_with_filters(self):
filters = {"tag": 'foo', "tag_scope": "scope_foo"}
- result = nvplib._build_uri_path('RESOURCE', filters=filters)
+ result = nsxlib._build_uri_path('RESOURCE', filters=filters)
expected = (
"%s/%s?tag_scope=scope_foo&tag=foo" %
- (nvplib.URI_PREFIX, 'RESOURCE'))
+ (nsxlib.URI_PREFIX, 'RESOURCE'))
self.assertEqual(expected, result)
def test_build_uri_path_with_resource_id(self):
res = 'RESOURCE'
res_id = 'resource_id'
- result = nvplib._build_uri_path(res, resource_id=res_id)
- expected = "%s/%s/%s" % (nvplib.URI_PREFIX, res, res_id)
+ result = nsxlib._build_uri_path(res, resource_id=res_id)
+ expected = "%s/%s/%s" % (nsxlib.URI_PREFIX, res, res_id)
self.assertEqual(expected, result)
def test_build_uri_path_with_parent_and_resource_id(self):
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
- result = nvplib._build_uri_path(
+ result = nsxlib._build_uri_path(
res, parent_resource_id=par_id, resource_id=res_id)
expected = ("%s/%s/%s/%s/%s" %
- (nvplib.URI_PREFIX, parent_res, par_id, child_res, res_id))
+ (nsxlib.URI_PREFIX, parent_res, par_id, child_res, res_id))
self.assertEqual(expected, result)
def test_build_uri_path_with_attachment(self):
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
- result = nvplib._build_uri_path(res, parent_resource_id=par_id,
+ result = nsxlib._build_uri_path(res, parent_resource_id=par_id,
resource_id=res_id, is_attachment=True)
expected = ("%s/%s/%s/%s/%s/%s" %
- (nvplib.URI_PREFIX, parent_res,
+ (nsxlib.URI_PREFIX, parent_res,
par_id, child_res, res_id, 'attachment'))
self.assertEqual(expected, result)
res = '%s/%s' % (child_res, parent_res)
par_id = 'parent_resource_id'
res_id = 'resource_id'
- result = nvplib._build_uri_path(res, parent_resource_id=par_id,
+ result = nsxlib._build_uri_path(res, parent_resource_id=par_id,
resource_id=res_id, extra_action='doh')
expected = ("%s/%s/%s/%s/%s/%s" %
- (nvplib.URI_PREFIX, parent_res,
+ (nsxlib.URI_PREFIX, parent_res,
par_id, child_res, res_id, 'doh'))
self.assertEqual(expected, result)
'request',
side_effect=api_exc.ReadOnlyMode):
self.assertRaises(nsx_exc.MaintenanceInProgress,
- nvplib.do_request, cluster=self.fake_cluster)
+ nsxlib.do_request, cluster=self.fake_cluster)
def test_cluster_method_not_implemented(self):
self.assertRaises(api_exc.NsxApiException,
- nvplib.do_request,
- nvplib.HTTP_GET,
- nvplib._build_uri_path('MY_FAKE_RESOURCE',
+ nsxlib.do_request,
+ nsxlib.HTTP_GET,
+ nsxlib._build_uri_path('MY_FAKE_RESOURCE',
resource_id='foo'),
cluster=self.fake_cluster)