from neutron.plugins.nicira.nsxlib import l2gateway as l2gwlib
from neutron.plugins.nicira.nsxlib import queue as queuelib
from neutron.plugins.nicira.nsxlib import router as routerlib
+from neutron.plugins.nicira.nsxlib import secgroup as secgrouplib
from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import NvpApiClient
-from neutron.plugins.nicira import nvplib
-
LOG = logging.getLogger("NeutronPlugin")
context.session, port_data['id'],
selected_lswitch['uuid'], lport['uuid'])
if port_data['device_owner'] not in self.port_special_owners:
- switchlib.plug_interface(
+ switchlib.plug_vif_interface(
self.cluster, selected_lswitch['uuid'],
lport['uuid'], "VifAttachment", port_data['id'])
LOG.debug(_("_nvp_create_port completed for port %(name)s "
nsx_switch_id, nsx_port_id = nsx_utils.get_nsx_switch_and_port_id(
context.session, self.cluster, port_id)
# Unplug current attachment from lswitch port
- switchlib.plug_interface(self.cluster, nsx_switch_id,
- nsx_port_id, "NoAttachment")
+ switchlib.plug_vif_interface(self.cluster, nsx_switch_id,
+ nsx_port_id, "NoAttachment")
# Create logical router port and plug patch attachment
self._create_and_attach_router_port(
self.cluster, context, nsx_router_id, port_data,
if not default_sg:
self._ensure_default_security_group(context, tenant_id)
- nvp_secgroup = nvplib.create_security_profile(self.cluster,
- tenant_id, s)
- security_group['security_group']['id'] = nvp_secgroup['uuid']
+ nsx_secgroup = secgrouplib.create_security_profile(self.cluster,
+ tenant_id, s)
+ security_group['security_group']['id'] = nsx_secgroup['uuid']
return super(NvpPluginV2, self).create_security_group(
context, security_group, default_sg)
context, filters):
raise ext_sg.SecurityGroupInUse(id=security_group['id'])
try:
- nvplib.delete_security_profile(
+ secgrouplib.delete_security_profile(
self.cluster, security_group['id'])
except q_exc.NotFound:
LOG.info(_("Security group: %s was already deleted "
# of them to PUT to NVP.
combined_rules = self._merge_security_group_rules_with_current(
context, s, security_group['id'])
- nvplib.update_security_group_rules(self.cluster,
- security_group['id'],
- combined_rules)
+ secgrouplib.update_security_group_rules(self.cluster,
+ security_group['id'],
+ combined_rules)
return super(
NvpPluginV2, self).create_security_group_rule_bulk_native(
context, security_group_rule)
self._remove_security_group_with_id_and_id_field(
current_rules, sgrid)
- nvplib.update_security_group_rules(
+ secgrouplib.update_security_group_rules(
self.cluster, sgid, current_rules)
return super(NvpPluginV2, self).delete_security_group_rule(context,
sgrid)
from neutron.plugins.common import constants as service_constants
from neutron.plugins.nicira.common import config # noqa
from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.dbexts import servicerouter as sr_db
from neutron.plugins.nicira.dbexts import vcns_db
from neutron.plugins.nicira.dbexts import vcns_models
from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import NvpApiClient
-from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira.vshield.common import (
constants as vcns_const)
from neutron.plugins.nicira.vshield.common.constants import RouterStatus
def _process_base_create_lswitch_args(*args, **kwargs):
- tags = [{"tag": nvplib.NEUTRON_VERSION, "scope": "quantum"}]
+ tags = utils.get_tags()
tags.append({"tag": args[1],
"scope": "quantum_net_id"})
if args[2]:
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
+# Maximum page size for a single request
+# NOTE(salv-orlando): This might become a version-dependent map should the
+# limit be raised in future versions
+MAX_PAGE_SIZE = 5000
+
LOG = log.getLogger(__name__)
# API. In this case the request should be split in multiple
# requests. This is not ideal, and therefore a log warning will
# be emitted.
- num_requests = page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1
+ num_requests = page_size / (MAX_PAGE_SIZE + 1) + 1
if num_requests > 1:
LOG.warn(_("Requested page size is %(cur_chunk_size)d."
"It might be necessary to do %(num_requests)d "
"is less than %(max_page_size)d"),
{'cur_chunk_size': page_size,
'num_requests': num_requests,
- 'max_page_size': nvplib.MAX_PAGE_SIZE})
+ 'max_page_size': MAX_PAGE_SIZE})
# Only the first request might return the total size,
# subsequent requests will definetely not
results, cursor, total_size = nvplib.get_single_query_page(
uri, self._cluster, cursor,
- min(page_size, nvplib.MAX_PAGE_SIZE))
+ min(page_size, MAX_PAGE_SIZE))
for _req in range(num_requests - 1):
# If no cursor is returned break the cycle as there is no
# actual need to perform multiple requests (all fetched)
break
req_results, cursor = nvplib.get_single_query_page(
uri, self._cluster, cursor,
- min(page_size, nvplib.MAX_PAGE_SIZE))[:2]
+ min(page_size, MAX_PAGE_SIZE))[:2]
results.extend(req_results)
# reset cursor before returning if we queried just to
# know the number of entities
from neutron.openstack.common import log
from neutron.plugins.nicira.common import utils
+from neutron.plugins.nicira.nsxlib import switch
from neutron.plugins.nicira.nvplib import _build_uri_path
-from neutron.plugins.nicira.nvplib import _plug_interface
from neutron.plugins.nicira.nvplib import do_request
from neutron.plugins.nicira.nvplib import get_all_query_pages
'l2_gateway_service_uuid': gateway_id}
if vlan_id:
att_obj['vlan_id'] = vlan_id
- return _plug_interface(cluster, lswitch_id, lport_id, att_obj)
+ return switch.plug_interface(cluster, lswitch_id, lport_id, att_obj)
def get_l2_gw_service(cluster, gateway_id):
--- /dev/null
+# Copyright 2014 VMware, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+from neutron.common import constants
+from neutron.common import exceptions
+from neutron.openstack.common import log
+from neutron.plugins.nicira.common import utils
+from neutron.plugins.nicira.nvplib import do_request
+from neutron.plugins.nicira.nvplib import format_exception
+
+HTTP_GET = "GET"
+HTTP_POST = "POST"
+HTTP_DELETE = "DELETE"
+HTTP_PUT = "PUT"
+
+LOG = log.getLogger(__name__)
+
+
+def mk_body(**kwargs):
+ """Convenience function creates and dumps dictionary to string.
+
+ :param kwargs: the key/value pirs to be dumped into a json string.
+ :returns: a json string.
+ """
+ return json.dumps(kwargs, ensure_ascii=False)
+
+
+def create_security_profile(cluster, tenant_id, security_profile):
+ path = "/ws.v1/security-profile"
+ # Allow all dhcp responses and all ingress traffic
+ hidden_rules = {'logical_port_egress_rules':
+ [{'ethertype': 'IPv4',
+ 'protocol': constants.PROTO_NUM_UDP,
+ 'port_range_min': constants.DHCP_RESPONSE_PORT,
+ 'port_range_max': constants.DHCP_RESPONSE_PORT,
+ 'ip_prefix': '0.0.0.0/0'}],
+ 'logical_port_ingress_rules':
+ [{'ethertype': 'IPv4'},
+ {'ethertype': 'IPv6'}]}
+ display_name = utils.check_and_truncate(security_profile.get('name'))
+ body = mk_body(
+ tags=utils.get_tags(os_tid=tenant_id), display_name=display_name,
+ logical_port_ingress_rules=(
+ hidden_rules['logical_port_ingress_rules']),
+ logical_port_egress_rules=hidden_rules['logical_port_egress_rules']
+ )
+ rsp = do_request(HTTP_POST, path, body, cluster=cluster)
+ if security_profile.get('name') == 'default':
+ # If security group is default allow ip traffic between
+ # members of the same security profile is allowed and ingress traffic
+ # from the switch
+ rules = {'logical_port_egress_rules': [{'ethertype': 'IPv4',
+ 'profile_uuid': rsp['uuid']},
+ {'ethertype': 'IPv6',
+ 'profile_uuid': rsp['uuid']}],
+ 'logical_port_ingress_rules': [{'ethertype': 'IPv4'},
+ {'ethertype': 'IPv6'}]}
+
+ update_security_group_rules(cluster, rsp['uuid'], rules)
+ LOG.debug(_("Created Security Profile: %s"), rsp)
+ return rsp
+
+
+def update_security_group_rules(cluster, spid, rules):
+ path = "/ws.v1/security-profile/%s" % spid
+
+ # Allow all dhcp responses in
+ rules['logical_port_egress_rules'].append(
+ {'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP,
+ 'port_range_min': constants.DHCP_RESPONSE_PORT,
+ 'port_range_max': constants.DHCP_RESPONSE_PORT,
+ 'ip_prefix': '0.0.0.0/0'})
+ # If there are no ingress rules add bunk rule to drop all ingress traffic
+ if not rules['logical_port_ingress_rules']:
+ rules['logical_port_ingress_rules'].append(
+ {'ethertype': 'IPv4', 'ip_prefix': '127.0.0.1/32'})
+ try:
+ body = mk_body(
+ logical_port_ingress_rules=rules['logical_port_ingress_rules'],
+ logical_port_egress_rules=rules['logical_port_egress_rules'])
+ rsp = do_request(HTTP_PUT, path, body, cluster=cluster)
+ except exceptions.NotFound as e:
+ LOG.error(format_exception("Unknown", e, locals()))
+ #FIXME(salvatore-orlando): This should not raise NeutronException
+ raise exceptions.NeutronException()
+ LOG.debug(_("Updated Security Profile: %s"), rsp)
+ return rsp
+
+
+def delete_security_profile(cluster, spid):
+ path = "/ws.v1/security-profile/%s" % spid
+
+ try:
+ do_request(HTTP_DELETE, path, cluster=cluster)
+ except exceptions.NotFound:
+ # This is not necessarily an error condition
+ LOG.warn(_("Unable to find security profile %s on NSX backend"),
+ spid)
+ raise
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira.nvplib import _build_uri_path
-from neutron.plugins.nicira.nvplib import _plug_interface
from neutron.plugins.nicira.nvplib import do_request
from neutron.plugins.nicira.nvplib import get_all_query_pages
return constants.PORT_STATUS_DOWN
-def plug_interface(cluster, lswitch_id, port, port_type, attachment=None):
+def plug_interface(cluster, lswitch_id, lport_id, att_obj):
+ return do_request(HTTP_PUT,
+ _build_uri_path(LSWITCHPORT_RESOURCE,
+ lport_id, lswitch_id,
+ is_attachment=True),
+ json.dumps(att_obj),
+ cluster=cluster)
+
+
+def plug_vif_interface(
+ cluster, lswitch_id, port_id, port_type, attachment=None):
"""Plug a VIF Attachment object in a logical port."""
lport_obj = {}
if attachment:
lport_obj["vif_uuid"] = attachment
lport_obj["type"] = port_type
- return _plug_interface(cluster, lswitch_id, port, lport_obj)
+ return plug_interface(cluster, lswitch_id, port_id, lport_obj)
import json
-#FIXME(danwent): I'd like this file to get to the point where it has
-# no neutron-specific logic in it
-from neutron.common import constants
+
from neutron.common import exceptions as exception
from neutron.openstack.common import log
from neutron.plugins.nicira.common import exceptions as nvp_exc
-from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NvpApiClient
-from neutron.version import version_info
LOG = log.getLogger(__name__)
LSWITCH_RESOURCE = "lswitch"
LSWITCHPORT_RESOURCE = "lport/%s" % LSWITCH_RESOURCE
-# Current neutron version
-NEUTRON_VERSION = version_info.release_string()
-
# Maximum page size for a single request
# NOTE(salv-orlando): This might become a version-dependent map should the
# limit be raised in future versions
return uri_path
-def get_cluster_version(cluster):
- """Return major/minor version #."""
- # Get control-cluster nodes
- uri = "/ws.v1/control-cluster/node?_page_length=1&fields=uuid"
- res = do_request(HTTP_GET, uri, cluster=cluster)
- if res["result_count"] == 0:
- return None
- node_uuid = res["results"][0]["uuid"]
- # Get control-cluster node status. It's unsupported to have controllers
- # running different version so we just need the first node version.
- uri = "/ws.v1/control-cluster/node/%s/status" % node_uuid
- res = do_request(HTTP_GET, uri, cluster=cluster)
- version_parts = res["version"].split(".")
- version = "%s.%s" % tuple(version_parts[:2])
- LOG.info(_("NVP controller cluster version: %s"), version)
- return version
-
-
def get_single_query_page(path, cluster, page_cursor=None,
page_length=1000, neutron_only=True):
params = []
return result_list
-def _plug_interface(cluster, lswitch_id, lport_id, att_obj):
- uri = _build_uri_path(LSWITCHPORT_RESOURCE, lport_id, lswitch_id,
- is_attachment=True)
- return do_request(HTTP_PUT, uri, json.dumps(att_obj),
- cluster=cluster)
-
-
-#------------------------------------------------------------------------------
-# Security Profile convenience functions.
-#------------------------------------------------------------------------------
-EXT_SECURITY_PROFILE_ID_SCOPE = 'nova_spid'
-TENANT_ID_SCOPE = 'os_tid'
-
-
def format_exception(etype, e, exception_locals):
"""Consistent formatting for exceptions.
raise exception.NotFound()
except NvpApiClient.ReadOnlyMode:
raise nvp_exc.MaintenanceInProgress()
-
-
-def mk_body(**kwargs):
- """Convenience function creates and dumps dictionary to string.
-
- :param kwargs: the key/value pirs to be dumped into a json string.
- :returns: a json string.
- """
- return json.dumps(kwargs, ensure_ascii=False)
-
-
-# -----------------------------------------------------------------------------
-# Security Group API Calls
-# -----------------------------------------------------------------------------
-def create_security_profile(cluster, tenant_id, security_profile):
- path = "/ws.v1/security-profile"
- # Allow all dhcp responses and all ingress traffic
- hidden_rules = {'logical_port_egress_rules':
- [{'ethertype': 'IPv4',
- 'protocol': constants.PROTO_NUM_UDP,
- 'port_range_min': constants.DHCP_RESPONSE_PORT,
- 'port_range_max': constants.DHCP_RESPONSE_PORT,
- 'ip_prefix': '0.0.0.0/0'}],
- 'logical_port_ingress_rules':
- [{'ethertype': 'IPv4'},
- {'ethertype': 'IPv6'}]}
- tags = [dict(scope='os_tid', tag=tenant_id),
- dict(scope='quantum', tag=NEUTRON_VERSION)]
- display_name = utils.check_and_truncate(security_profile.get('name'))
- body = mk_body(
- tags=tags, display_name=display_name,
- logical_port_ingress_rules=(
- hidden_rules['logical_port_ingress_rules']),
- logical_port_egress_rules=hidden_rules['logical_port_egress_rules']
- )
- rsp = do_request(HTTP_POST, path, body, cluster=cluster)
- if security_profile.get('name') == 'default':
- # If security group is default allow ip traffic between
- # members of the same security profile is allowed and ingress traffic
- # from the switch
- rules = {'logical_port_egress_rules': [{'ethertype': 'IPv4',
- 'profile_uuid': rsp['uuid']},
- {'ethertype': 'IPv6',
- 'profile_uuid': rsp['uuid']}],
- 'logical_port_ingress_rules': [{'ethertype': 'IPv4'},
- {'ethertype': 'IPv6'}]}
-
- update_security_group_rules(cluster, rsp['uuid'], rules)
- LOG.debug(_("Created Security Profile: %s"), rsp)
- return rsp
-
-
-def update_security_group_rules(cluster, spid, rules):
- path = "/ws.v1/security-profile/%s" % spid
-
- # Allow all dhcp responses in
- rules['logical_port_egress_rules'].append(
- {'ethertype': 'IPv4', 'protocol': constants.PROTO_NUM_UDP,
- 'port_range_min': constants.DHCP_RESPONSE_PORT,
- 'port_range_max': constants.DHCP_RESPONSE_PORT,
- 'ip_prefix': '0.0.0.0/0'})
- # If there are no ingress rules add bunk rule to drop all ingress traffic
- if not rules['logical_port_ingress_rules']:
- rules['logical_port_ingress_rules'].append(
- {'ethertype': 'IPv4', 'ip_prefix': '127.0.0.1/32'})
- try:
- body = mk_body(
- logical_port_ingress_rules=rules['logical_port_ingress_rules'],
- logical_port_egress_rules=rules['logical_port_egress_rules'])
- rsp = do_request(HTTP_PUT, path, body, cluster=cluster)
- except exception.NotFound as e:
- LOG.error(format_exception("Unknown", e, locals()))
- #FIXME(salvatore-orlando): This should not raise NeutronException
- raise exception.NeutronException()
- LOG.debug(_("Updated Security Profile: %s"), rsp)
- return rsp
-
-
-def delete_security_profile(cluster, spid):
- path = "/ws.v1/security-profile/%s" % spid
- try:
- do_request(HTTP_DELETE, path, cluster=cluster)
- except exception.NotFound:
- # This is not necessarily an error condition
- LOG.warn(_("Unable to find security profile %s on NSX backend"),
- spid)
- raise
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2014 VMware, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import mock
+
+from neutron.plugins.nicira.common import config # noqa
+from neutron.plugins.nicira import nsx_cluster
+from neutron.plugins.nicira import NvpApiClient
+from neutron.plugins.nicira import nvplib
+from neutron.tests import base
+from neutron.tests.unit.nicira import fake_nvpapiclient
+from neutron.tests.unit.nicira import NVPAPI_NAME
+from neutron.tests.unit.nicira import STUBS_PATH
+from neutron.tests.unit import test_api_v2
+
+_uuid = test_api_v2._uuid
+
+
+class NsxlibTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ # mock nvp api client
+ self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
+ self.mock_nsxapi = mock.patch(NVPAPI_NAME, autospec=True)
+ instance = self.mock_nsxapi.start()
+ instance.return_value.login.return_value = "the_cookie"
+ fake_version = getattr(self, 'fake_version', "3.0")
+ instance.return_value.get_nvp_version.return_value = (
+ NvpApiClient.NVPVersion(fake_version))
+
+ def _fake_request(*args, **kwargs):
+ return self.fc.fake_request(*args, **kwargs)
+
+ instance.return_value.request.side_effect = _fake_request
+ self.fake_cluster = nsx_cluster.NSXCluster(
+ name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
+ default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
+ self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
+ ('1.1.1.1', '999', True),
+ self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
+ self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
+ self.fake_cluster.retries, self.fake_cluster.redirects)
+
+ super(NsxlibTestCase, self).setUp()
+ self.addCleanup(self.fc.reset_all)
+ self.addCleanup(self.mock_nsxapi.stop)
+
+ def _build_tag_dict(self, tags):
+ # This syntax is needed for python 2.6 compatibility
+ return dict((t['scope'], t['tag']) for t in tags)
+
+
+class NsxlibNegativeBaseTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ # mock nsx api client
+ self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
+ self.mock_nsxapi = mock.patch(NVPAPI_NAME, autospec=True)
+ instance = self.mock_nsxapi.start()
+ instance.return_value.login.return_value = "the_cookie"
+ # Choose 3.0, but the version is irrelevant for the aim of
+ # these tests as calls are throwing up errors anyway
+ fake_version = getattr(self, 'fake_version', "3.0")
+ instance.return_value.get_nvp_version.return_value = (
+ NvpApiClient.NVPVersion(fake_version))
+
+ def _faulty_request(*args, **kwargs):
+ raise nvplib.NvpApiClient.NvpApiException
+
+ instance.return_value.request.side_effect = _faulty_request
+ self.fake_cluster = nsx_cluster.NSXCluster(
+ name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
+ default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
+ self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
+ ('1.1.1.1', '999', True),
+ self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
+ self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
+ self.fake_cluster.retries, self.fake_cluster.redirects)
+
+ super(NsxlibNegativeBaseTestCase, self).setUp()
+ self.addCleanup(self.fc.reset_all)
+ self.addCleanup(self.mock_nsxapi.stop)
from neutron.plugins.nicira.nsxlib import l2gateway as l2gwlib
from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import nvplib
-from neutron.tests.unit.nicira.test_nvplib import NsxlibNegativeBaseTestCase
-from neutron.tests.unit.nicira.test_nvplib import NvplibTestCase
+from neutron.tests.unit.nicira.nsxlib import base
from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid
-class L2GatewayNegativeTestCase(NsxlibNegativeBaseTestCase):
+class L2GatewayNegativeTestCase(base.NsxlibNegativeBaseTestCase):
def test_create_l2_gw_service_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
'pluto')
-class L2GatewayTestCase(NvplibTestCase):
+class L2GatewayTestCase(base.NsxlibTestCase):
def _create_gw_service(self, node_uuid, display_name,
tenant_id='fake_tenant'):
from neutron.common import exceptions
from neutron.plugins.nicira.nsxlib import queue as queuelib
from neutron.plugins.nicira import NvpApiClient
-from neutron.tests.unit.nicira.test_nvplib import NvplibTestCase
+from neutron.tests.unit.nicira.nsxlib import base
-class TestLogicalQueueLib(NvplibTestCase):
+class TestLogicalQueueLib(base.NsxlibTestCase):
def setUp(self):
super(TestLogicalQueueLib, self).setUp()
from neutron.common import exceptions
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
-from neutron.tests.unit.nicira.test_nvplib import NsxlibNegativeBaseTestCase
-from neutron.tests.unit.nicira.test_nvplib import NvplibTestCase
+from neutron.tests.unit.nicira.nsxlib import base
from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid
-class TestNatRules(NvplibTestCase):
+class TestNatRules(base.NsxlibTestCase):
def _test_create_lrouter_dnat_rule(self, version):
with mock.patch.object(self.fake_cluster.api_client,
self._test_create_lrouter_dnat_rule('3.1')
-class TestExplicitLRouters(NvplibTestCase):
+class TestExplicitLRouters(base.NsxlibTestCase):
def setUp(self):
self.fake_version = '3.2'
router = {'display_name': router_name,
'uuid': router_id,
- 'tags': [{'scope': 'quantum', 'tag': nvplib.NEUTRON_VERSION},
- {'scope': 'os_tid', 'tag': '%s' % tenant_id}],
+ 'tags': utils.get_tags(os_tid=tenant_id),
'distributed': False,
'routing_config': {'type': 'RoutingTableRoutingConfig',
'_schema': schema},
{'gateway_ip_address': 'fake_address',
'type': 'RouterNextHop'},
'type': 'SingleDefaultRouteImplicitRoutingConfig'},
- 'tags': [{'scope': 'os_tid', 'tag': 'fake_tenant_id'},
- {'scope': 'q_router_id', 'tag': 'pipita_higuain'},
- {'scope': 'quantum',
- 'tag': nvplib.NEUTRON_VERSION}],
+ 'tags': utils.get_tags(os_tid='fake_tenant_id',
+ q_router_id='pipita_higuain'),
'type': 'LogicalRouterConfig'}
self.assertEqual(expected, body)
tenant_id, router_type)
expected = {'display_name': 'fake_router_name',
'routing_config': {'type': 'RoutingTableRoutingConfig'},
- 'tags': [{'scope': 'os_tid', 'tag': 'fake_tenant_id'},
- {'scope': 'q_router_id',
- 'tag': 'marekiaro_hamsik'},
- {'scope': 'quantum',
- 'tag': nvplib.NEUTRON_VERSION}],
+ 'tags': utils.get_tags(os_tid='fake_tenant_id',
+ q_router_id='marekiaro_hamsik'),
'type': 'LogicalRouterConfig'}
self.assertEqual(expected, body)
self.fake_cluster, router_id, new_routes)
-class RouterNegativeTestCase(NsxlibNegativeBaseTestCase):
+class RouterNegativeTestCase(base.NsxlibNegativeBaseTestCase):
def test_create_lrouter_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
'new_hop')
-class TestLogicalRouters(NvplibTestCase):
+class TestLogicalRouters(base.NsxlibTestCase):
def _verify_lrouter(self, res_lrouter,
expected_uuid,
--- /dev/null
+# Copyright (c) 2014 VMware, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from neutron.common import exceptions
+from neutron.plugins.nicira.nsxlib import secgroup as secgrouplib
+from neutron.plugins.nicira import nvplib
+from neutron.tests.unit.nicira.nsxlib import base
+
+
+class SecurityProfileTestCase(base.NsxlibTestCase):
+
+ def test_create_and_get_security_profile(self):
+ sec_prof = secgrouplib.create_security_profile(
+ self.fake_cluster, 'pippo', {'name': 'test'})
+ sec_prof_res = nvplib.do_request(
+ secgrouplib.HTTP_GET,
+ nvplib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+ self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
+ # Check for builtin rules
+ self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 1)
+ self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
+
+ def test_create_and_get_default_security_profile(self):
+ sec_prof = secgrouplib.create_security_profile(
+ self.fake_cluster, 'pippo', {'name': 'default'})
+ sec_prof_res = nvplib.do_request(
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+ self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
+ # Check for builtin rules
+ self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 3)
+ self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
+
+ def test_update_security_profile_rules(self):
+ sec_prof = secgrouplib.create_security_profile(
+ self.fake_cluster, 'pippo', {'name': 'test'})
+ ingress_rule = {'ethertype': 'IPv4'}
+ egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
+ new_rules = {'logical_port_egress_rules': [egress_rule],
+ 'logical_port_ingress_rules': [ingress_rule]}
+ secgrouplib.update_security_group_rules(
+ self.fake_cluster, sec_prof['uuid'], new_rules)
+ sec_prof_res = nvplib.do_request(
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+ self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
+ # Check for builtin rules
+ self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
+ self.assertIn(egress_rule,
+ sec_prof_res['logical_port_egress_rules'])
+ self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
+ self.assertIn(ingress_rule,
+ sec_prof_res['logical_port_ingress_rules'])
+
+ def test_update_security_profile_rules_noingress(self):
+ sec_prof = secgrouplib.create_security_profile(
+ self.fake_cluster, 'pippo', {'name': 'test'})
+ hidden_ingress_rule = {'ethertype': 'IPv4',
+ 'ip_prefix': '127.0.0.1/32'}
+ egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
+ new_rules = {'logical_port_egress_rules': [egress_rule],
+ 'logical_port_ingress_rules': []}
+ secgrouplib.update_security_group_rules(
+ self.fake_cluster, sec_prof['uuid'], new_rules)
+ sec_prof_res = nvplib.do_request(
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+ self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
+ # Check for builtin rules
+ self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
+ self.assertIn(egress_rule,
+ sec_prof_res['logical_port_egress_rules'])
+ self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
+ self.assertIn(hidden_ingress_rule,
+ sec_prof_res['logical_port_ingress_rules'])
+
+ def test_update_non_existing_securityprofile_raises(self):
+ self.assertRaises(exceptions.NeutronException,
+ secgrouplib.update_security_group_rules,
+ self.fake_cluster, 'whatever',
+ {'logical_port_egress_rules': [],
+ 'logical_port_ingress_rules': []})
+
+ def test_delete_security_profile(self):
+ sec_prof = secgrouplib.create_security_profile(
+ self.fake_cluster, 'pippo', {'name': 'test'})
+ secgrouplib.delete_security_profile(
+ self.fake_cluster, sec_prof['uuid'])
+ self.assertRaises(exceptions.NotFound,
+ nvplib.do_request,
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path(
+ 'security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+
+ def test_delete_non_existing_securityprofile_raises(self):
+ self.assertRaises(exceptions.NeutronException,
+ secgrouplib.delete_security_profile,
+ self.fake_cluster, 'whatever')
from neutron.common import exceptions
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import switch as switchlib
-from neutron.tests.unit.nicira.test_nvplib import NvplibTestCase
+from neutron.tests.unit.nicira.nsxlib import base
from neutron.tests.unit import test_api_v2
_uuid = test_api_v2._uuid
-class LogicalSwitchesTestCase(NvplibTestCase):
+class LogicalSwitchesTestCase(base.NsxlibTestCase):
def test_create_and_get_lswitches_single(self):
tenant_id = 'pippo'
self.fake_cluster, 'whatever', ['whatever'])
-class LogicalPortsTestCase(NvplibTestCase):
+class LogicalPortsTestCase(base.NsxlibTestCase):
def _create_switch_and_port(self, tenant_id='pippo',
neutron_port_id='whatever',
def test_plug_interface(self):
lswitch, lport = self._create_switch_and_port()
- switchlib.plug_interface(self.fake_cluster, lswitch['uuid'],
- lport['uuid'], 'VifAttachment', 'fake')
+ switchlib.plug_vif_interface(self.fake_cluster, lswitch['uuid'],
+ lport['uuid'], 'VifAttachment', 'fake')
lport_res = switchlib.get_port(self.fake_cluster,
lswitch['uuid'], lport['uuid'])
self.assertEqual(lport['uuid'], lport_res['uuid'])
from neutron.extensions import l3
from neutron.manager import NeutronManager
from neutron.openstack.common import uuidutils
+from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NeutronServicePlugin as nsp
-from neutron.plugins.nicira import nvplib
from neutron.tests import base
from neutron.tests.unit.nicira import NVPEXT_PATH
from neutron.tests.unit.nicira import SERVICE_PLUGIN_NAME
{'zone_uuid': 'foo_zone',
'transport_type': 'stt'}
]
- self.tags = [
- {'scope': 'quantum', 'tag': nvplib.NEUTRON_VERSION},
- {'scope': 'quantum_net_id', 'tag': 'foo_id'},
- {'scope': 'os_tid', 'tag': self.tenant_id}
- ]
+ self.tags = utils.get_tags(quantum_net_id='foo_id',
+ os_tid=self.tenant_id)
self.cluster = None
def test_create_lswitch_with_basic_args(self):
self.tz_config)
self.assertEqual(self.display_name, result[0])
self.assertEqual(self.tz_config, result[1])
- self.assertEqual(self.tags, result[2])
+ self.assertEqual(sorted(self.tags), sorted(result[2]))
def test_create_lswitch_with_shared_as_kwarg(self):
result = nsp._process_base_create_lswitch_args(self.cluster,
self.tz_config,
shared=True)
expected = self.tags + [{'scope': 'shared', 'tag': 'true'}]
- self.assertEqual(expected, result[2])
+ self.assertEqual(sorted(expected), sorted(result[2]))
def test_create_lswitch_with_shared_as_arg(self):
result = nsp._process_base_create_lswitch_args(self.cluster,
True)
additional_tags = [{'scope': 'shared', 'tag': 'true'}]
expected = self.tags + additional_tags
- self.assertEqual(expected, result[2])
+ self.assertEqual(sorted(expected), sorted(result[2]))
def test_create_lswitch_with_additional_tags(self):
more_tags = [{'scope': 'foo_scope', 'tag': 'foo_tag'}]
self.tz_config,
tags=more_tags)
expected = self.tags + more_tags
- self.assertEqual(expected, result[2])
+ self.assertEqual(sorted(expected), sorted(result[2]))
from neutron.plugins.nicira import nsxlib
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira.NvpApiClient import NVPVersion
-from neutron.plugins.nicira import nvplib
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.nicira import fake_nvpapiclient
from neutron.tests.unit.nicira import get_fake_conf
def test_create_port_maintenance_returns_503(self):
with self.network() as net:
- with mock.patch.object(nvplib, 'do_request',
+ with mock.patch.object(nsxlib.switch, 'do_request',
side_effect=nvp_exc.MaintenanceInProgress):
data = {'port': {'network_id': net['network']['id'],
'admin_state_up': False,
data = {'network': {'name': 'foo',
'admin_state_up': True,
'tenant_id': self._tenant_id}}
- with mock.patch.object(nvplib, 'do_request',
+ with mock.patch.object(nsxlib.switch, 'do_request',
side_effect=nvp_exc.MaintenanceInProgress):
net_req = self.new_create_request('networks', data, self.fmt)
res = net_req.get_response(self.api)
from neutron.db import api as db_api
from neutron.openstack.common import uuidutils
+from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import nsx_utils
+from neutron.plugins.nicira.common import utils
+from neutron.plugins.nicira import NvpApiClient
+from neutron.plugins.nicira import nvplib
from neutron.tests import base
from neutron.tests.unit.nicira import nicira_method
+from neutron.tests.unit.nicira.nsxlib import base as nsx_base
class NsxUtilsTestCase(base.BaseTestCase):
module_name='nsxlib.router'),
return_value=[]):
self._verify_get_nsx_router_id(None)
+
+ def test_check_and_truncate_name_with_none(self):
+ name = None
+ result = utils.check_and_truncate(name)
+ self.assertEqual('', result)
+
+ def test_check_and_truncate_name_with_short_name(self):
+ name = 'foo_port_name'
+ result = utils.check_and_truncate(name)
+ self.assertEqual(name, result)
+
+ def test_check_and_truncate_name_long_name(self):
+ name = 'this_is_a_port_whose_name_is_longer_than_40_chars'
+ result = utils.check_and_truncate(name)
+ self.assertEqual(len(result), utils.MAX_DISPLAY_NAME_LEN)
+
+ def test_build_uri_path_plain(self):
+ result = nvplib._build_uri_path('RESOURCE')
+ self.assertEqual("%s/%s" % (nvplib.URI_PREFIX, 'RESOURCE'), result)
+
+ def test_build_uri_path_with_field(self):
+ result = nvplib._build_uri_path('RESOURCE', fields='uuid')
+ expected = "%s/%s?fields=uuid" % (nvplib.URI_PREFIX, 'RESOURCE')
+ self.assertEqual(expected, result)
+
+ def test_build_uri_path_with_filters(self):
+ filters = {"tag": 'foo', "tag_scope": "scope_foo"}
+ result = nvplib._build_uri_path('RESOURCE', filters=filters)
+ expected = (
+ "%s/%s?tag_scope=scope_foo&tag=foo" %
+ (nvplib.URI_PREFIX, 'RESOURCE'))
+ self.assertEqual(expected, result)
+
+ def test_build_uri_path_with_resource_id(self):
+ res = 'RESOURCE'
+ res_id = 'resource_id'
+ result = nvplib._build_uri_path(res, resource_id=res_id)
+ expected = "%s/%s/%s" % (nvplib.URI_PREFIX, res, res_id)
+ self.assertEqual(expected, result)
+
+ def test_build_uri_path_with_parent_and_resource_id(self):
+ parent_res = 'RESOURCE_PARENT'
+ child_res = 'RESOURCE_CHILD'
+ res = '%s/%s' % (child_res, parent_res)
+ par_id = 'parent_resource_id'
+ res_id = 'resource_id'
+ result = nvplib._build_uri_path(
+ res, parent_resource_id=par_id, resource_id=res_id)
+ expected = ("%s/%s/%s/%s/%s" %
+ (nvplib.URI_PREFIX, parent_res, par_id, child_res, res_id))
+ self.assertEqual(expected, result)
+
+ def test_build_uri_path_with_attachment(self):
+ parent_res = 'RESOURCE_PARENT'
+ child_res = 'RESOURCE_CHILD'
+ res = '%s/%s' % (child_res, parent_res)
+ par_id = 'parent_resource_id'
+ res_id = 'resource_id'
+ result = nvplib._build_uri_path(res, parent_resource_id=par_id,
+ resource_id=res_id, is_attachment=True)
+ expected = ("%s/%s/%s/%s/%s/%s" %
+ (nvplib.URI_PREFIX, parent_res,
+ par_id, child_res, res_id, 'attachment'))
+ self.assertEqual(expected, result)
+
+ def test_build_uri_path_with_extra_action(self):
+ parent_res = 'RESOURCE_PARENT'
+ child_res = 'RESOURCE_CHILD'
+ res = '%s/%s' % (child_res, parent_res)
+ par_id = 'parent_resource_id'
+ res_id = 'resource_id'
+ result = nvplib._build_uri_path(res, parent_resource_id=par_id,
+ resource_id=res_id, extra_action='doh')
+ expected = ("%s/%s/%s/%s/%s/%s" %
+ (nvplib.URI_PREFIX, parent_res,
+ par_id, child_res, res_id, 'doh'))
+ self.assertEqual(expected, result)
+
+
+class ClusterManagementTestCase(nsx_base.NsxlibTestCase):
+
+ def test_cluster_in_readonly_mode(self):
+ with mock.patch.object(self.fake_cluster.api_client,
+ 'request',
+ side_effect=NvpApiClient.ReadOnlyMode):
+ self.assertRaises(nvp_exc.MaintenanceInProgress,
+ nvplib.do_request, cluster=self.fake_cluster)
+
+ def test_cluster_method_not_implemented(self):
+ self.assertRaises(NvpApiClient.NvpApiException,
+ nvplib.do_request,
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('MY_FAKE_RESOURCE',
+ resource_id='foo'),
+ cluster=self.fake_cluster)
with self._populate_data(ctx, net_size=net_size,
port_size=port_size,
router_size=router_size):
- with mock.patch.object(nvplib, 'MAX_PAGE_SIZE', 15):
+ with mock.patch.object(sync, 'MAX_PAGE_SIZE', 15):
# The following mock is just for counting calls,
# but we will still run the actual function
with mock.patch.object(
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright (c) 2013 OpenStack Foundation.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# @author: Salvatore Orlando, VMware
-
-import mock
-
-from neutron.common import exceptions
-from neutron.plugins.nicira.common import config # noqa
-from neutron.plugins.nicira.common import exceptions as nvp_exc
-from neutron.plugins.nicira.common import utils
-from neutron.plugins.nicira import nsx_cluster
-from neutron.plugins.nicira import NvpApiClient
-from neutron.plugins.nicira import nvplib
-from neutron.tests import base
-from neutron.tests.unit.nicira import fake_nvpapiclient
-from neutron.tests.unit.nicira import NVPAPI_NAME
-from neutron.tests.unit.nicira import STUBS_PATH
-from neutron.tests.unit import test_api_v2
-
-
-_uuid = test_api_v2._uuid
-
-
-class NvplibTestCase(base.BaseTestCase):
-
- def setUp(self):
- # mock nvp api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
- self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
- instance = self.mock_nvpapi.start()
- instance.return_value.login.return_value = "the_cookie"
- fake_version = getattr(self, 'fake_version', "3.0")
- instance.return_value.get_nvp_version.return_value = (
- NvpApiClient.NVPVersion(fake_version))
-
- def _fake_request(*args, **kwargs):
- return self.fc.fake_request(*args, **kwargs)
-
- instance.return_value.request.side_effect = _fake_request
- self.fake_cluster = nsx_cluster.NSXCluster(
- name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
- default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
- self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
- ('1.1.1.1', '999', True),
- self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
- self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
- self.fake_cluster.retries, self.fake_cluster.redirects)
-
- super(NvplibTestCase, self).setUp()
- self.addCleanup(self.fc.reset_all)
- self.addCleanup(self.mock_nvpapi.stop)
-
- def _build_tag_dict(self, tags):
- # This syntax is needed for python 2.6 compatibility
- return dict((t['scope'], t['tag']) for t in tags)
-
-
-class NsxlibNegativeBaseTestCase(base.BaseTestCase):
-
- def setUp(self):
- # mock nvp api client
- self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
- self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
- instance = self.mock_nvpapi.start()
- instance.return_value.login.return_value = "the_cookie"
- # Choose 3.0, but the version is irrelevant for the aim of
- # these tests as calls are throwing up errors anyway
- fake_version = getattr(self, 'fake_version', "3.0")
- instance.return_value.get_nvp_version.return_value = (
- NvpApiClient.NVPVersion(fake_version))
-
- def _faulty_request(*args, **kwargs):
- raise nvplib.NvpApiClient.NvpApiException
-
- instance.return_value.request.side_effect = _faulty_request
- self.fake_cluster = nsx_cluster.NSXCluster(
- name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
- default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
- self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
- ('1.1.1.1', '999', True),
- self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
- self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
- self.fake_cluster.retries, self.fake_cluster.redirects)
-
- super(NsxlibNegativeBaseTestCase, self).setUp()
- self.addCleanup(self.fc.reset_all)
- self.addCleanup(self.mock_nvpapi.stop)
-
-
-class TestNvplibSecurityProfile(NvplibTestCase):
-
- def test_create_and_get_security_profile(self):
- sec_prof = nvplib.create_security_profile(self.fake_cluster,
- 'pippo', {'name': 'test'})
- sec_prof_res = nvplib.do_request(
- nvplib.HTTP_GET,
- nvplib._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
- cluster=self.fake_cluster)
- self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
- # Check for builtin rules
- self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 1)
- self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
-
- def test_create_and_get_default_security_profile(self):
- sec_prof = nvplib.create_security_profile(self.fake_cluster,
- 'pippo',
- {'name': 'default'})
- sec_prof_res = nvplib.do_request(
- nvplib.HTTP_GET,
- nvplib._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
- cluster=self.fake_cluster)
- self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
- # Check for builtin rules
- self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 3)
- self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
-
- def test_update_security_profile_rules(self):
- sec_prof = nvplib.create_security_profile(self.fake_cluster,
- 'pippo', {'name': 'test'})
- ingress_rule = {'ethertype': 'IPv4'}
- egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
- new_rules = {'logical_port_egress_rules': [egress_rule],
- 'logical_port_ingress_rules': [ingress_rule]}
- nvplib.update_security_group_rules(self.fake_cluster,
- sec_prof['uuid'],
- new_rules)
- sec_prof_res = nvplib.do_request(
- nvplib.HTTP_GET,
- nvplib._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
- cluster=self.fake_cluster)
- self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
- # Check for builtin rules
- self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
- self.assertIn(egress_rule,
- sec_prof_res['logical_port_egress_rules'])
- self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
- self.assertIn(ingress_rule,
- sec_prof_res['logical_port_ingress_rules'])
-
- def test_update_security_profile_rules_noingress(self):
- sec_prof = nvplib.create_security_profile(self.fake_cluster,
- 'pippo', {'name': 'test'})
- hidden_ingress_rule = {'ethertype': 'IPv4',
- 'ip_prefix': '127.0.0.1/32'}
- egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
- new_rules = {'logical_port_egress_rules': [egress_rule],
- 'logical_port_ingress_rules': []}
- nvplib.update_security_group_rules(self.fake_cluster,
- sec_prof['uuid'],
- new_rules)
- sec_prof_res = nvplib.do_request(
- nvplib.HTTP_GET,
- nvplib._build_uri_path('security-profile',
- resource_id=sec_prof['uuid']),
- cluster=self.fake_cluster)
- self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
- # Check for builtin rules
- self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
- self.assertIn(egress_rule,
- sec_prof_res['logical_port_egress_rules'])
- self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
- self.assertIn(hidden_ingress_rule,
- sec_prof_res['logical_port_ingress_rules'])
-
- def test_update_non_existing_securityprofile_raises(self):
- self.assertRaises(exceptions.NeutronException,
- nvplib.update_security_group_rules,
- self.fake_cluster, 'whatever',
- {'logical_port_egress_rules': [],
- 'logical_port_ingress_rules': []})
-
- def test_delete_security_profile(self):
- sec_prof = nvplib.create_security_profile(self.fake_cluster,
- 'pippo', {'name': 'test'})
- nvplib.delete_security_profile(self.fake_cluster, sec_prof['uuid'])
- self.assertRaises(exceptions.NotFound,
- nvplib.do_request,
- nvplib.HTTP_GET,
- nvplib._build_uri_path(
- 'security-profile',
- resource_id=sec_prof['uuid']),
- cluster=self.fake_cluster)
-
- def test_delete_non_existing_securityprofile_raises(self):
- self.assertRaises(exceptions.NeutronException,
- nvplib.delete_security_profile,
- self.fake_cluster, 'whatever')
-
-
-class TestNvplibClusterManagement(NvplibTestCase):
-
- def test_get_cluster_version(self):
-
- def fakedorequest(*args, **kwargs):
- uri = args[1]
- if 'node/xyz' in uri:
- return {'version': '3.0.9999'}
- elif 'node' in uri:
- return {'result_count': 1,
- 'results': [{'uuid': 'xyz'}]}
-
- with mock.patch.object(nvplib, 'do_request', new=fakedorequest):
- version = nvplib.get_cluster_version('whatever')
- self.assertEqual(version, '3.0')
-
- def test_get_cluster_version_no_nodes(self):
- def fakedorequest(*args, **kwargs):
- uri = args[1]
- if 'node' in uri:
- return {'result_count': 0}
-
- with mock.patch.object(nvplib, 'do_request', new=fakedorequest):
- version = nvplib.get_cluster_version('whatever')
- self.assertIsNone(version)
-
- def test_cluster_in_readonly_mode(self):
- with mock.patch.object(self.fake_cluster.api_client,
- 'request',
- side_effect=NvpApiClient.ReadOnlyMode):
- self.assertRaises(nvp_exc.MaintenanceInProgress,
- nvplib.do_request, cluster=self.fake_cluster)
-
- def test_cluster_method_not_implemetned(self):
- self.assertRaises(NvpApiClient.NvpApiException,
- nvplib.do_request,
- nvplib.HTTP_GET,
- nvplib._build_uri_path('MY_FAKE_RESOURCE',
- resource_id='foo'),
- cluster=self.fake_cluster)
-
-
-class NvplibMiscTestCase(base.BaseTestCase):
-
- def test_check_and_truncate_name_with_none(self):
- name = None
- result = utils.check_and_truncate(name)
- self.assertEqual('', result)
-
- def test_check_and_truncate_name_with_short_name(self):
- name = 'foo_port_name'
- result = utils.check_and_truncate(name)
- self.assertEqual(name, result)
-
- def test_check_and_truncate_name_long_name(self):
- name = 'this_is_a_port_whose_name_is_longer_than_40_chars'
- result = utils.check_and_truncate(name)
- self.assertEqual(len(result), utils.MAX_DISPLAY_NAME_LEN)
-
- def test_build_uri_path_plain(self):
- result = nvplib._build_uri_path('RESOURCE')
- self.assertEqual("%s/%s" % (nvplib.URI_PREFIX, 'RESOURCE'), result)
-
- def test_build_uri_path_with_field(self):
- result = nvplib._build_uri_path('RESOURCE', fields='uuid')
- expected = "%s/%s?fields=uuid" % (nvplib.URI_PREFIX, 'RESOURCE')
- self.assertEqual(expected, result)
-
- def test_build_uri_path_with_filters(self):
- filters = {"tag": 'foo', "tag_scope": "scope_foo"}
- result = nvplib._build_uri_path('RESOURCE', filters=filters)
- expected = (
- "%s/%s?tag_scope=scope_foo&tag=foo" %
- (nvplib.URI_PREFIX, 'RESOURCE'))
- self.assertEqual(expected, result)
-
- def test_build_uri_path_with_resource_id(self):
- res = 'RESOURCE'
- res_id = 'resource_id'
- result = nvplib._build_uri_path(res, resource_id=res_id)
- expected = "%s/%s/%s" % (nvplib.URI_PREFIX, res, res_id)
- self.assertEqual(expected, result)
-
- def test_build_uri_path_with_parent_and_resource_id(self):
- parent_res = 'RESOURCE_PARENT'
- child_res = 'RESOURCE_CHILD'
- res = '%s/%s' % (child_res, parent_res)
- par_id = 'parent_resource_id'
- res_id = 'resource_id'
- result = nvplib._build_uri_path(
- res, parent_resource_id=par_id, resource_id=res_id)
- expected = ("%s/%s/%s/%s/%s" %
- (nvplib.URI_PREFIX, parent_res, par_id, child_res, res_id))
- self.assertEqual(expected, result)
-
- def test_build_uri_path_with_attachment(self):
- parent_res = 'RESOURCE_PARENT'
- child_res = 'RESOURCE_CHILD'
- res = '%s/%s' % (child_res, parent_res)
- par_id = 'parent_resource_id'
- res_id = 'resource_id'
- result = nvplib._build_uri_path(res, parent_resource_id=par_id,
- resource_id=res_id, is_attachment=True)
- expected = ("%s/%s/%s/%s/%s/%s" %
- (nvplib.URI_PREFIX, parent_res,
- par_id, child_res, res_id, 'attachment'))
- self.assertEqual(expected, result)
-
- def test_build_uri_path_with_extra_action(self):
- parent_res = 'RESOURCE_PARENT'
- child_res = 'RESOURCE_CHILD'
- res = '%s/%s' % (child_res, parent_res)
- par_id = 'parent_resource_id'
- res_id = 'resource_id'
- result = nvplib._build_uri_path(res, parent_resource_id=par_id,
- resource_id=res_id, extra_action='doh')
- expected = ("%s/%s/%s/%s/%s/%s" %
- (nvplib.URI_PREFIX, parent_res,
- par_id, child_res, res_id, 'doh'))
- self.assertEqual(expected, result)